LLM 多协议适配器 (OpenAI/Anthropic/Responses+SSE+账号池)

下载 .md

LLM API 适配层 SOP — OpenAI / Anthropic / Responses 三协议互转 + SSE 流式

适用场景:构建任意 LLM 后端的 OpenAI / Anthropic / Responses 三协议代理;处理 SSE 流式、账号池重试、Cherry Studio 空响应等。 核心理念:每协议独立 handler,共享一套账号池与 3-tier 错误分类;SSE 必须每事件 flush + \n\n 双换行。


一、核心 SOP

This skill is a battle-tested reference for building proxy/adapter layers that expose three industry-standard LLM API interfaces on top of any LLM backend. It is language-agnostic — the patterns apply equally to Go, Python, TypeScript, Rust, or any HTTP server.

The three interfaces:

  1. OpenAI Chat Completions (POST /v1/chat/completions) — the most widely supported format
  2. Anthropic Messages (POST /v1/messages) — native Claude API format
  3. OpenAI Responses (POST /v1/responses) — newer OpenAI format with strict SSE requirements

All three share a common architecture: receive client request → translate to upstream format → call upstream → translate response back → return to client. The devil is in the SSE streaming details.


Architecture Overview

Client Request (any of 3 formats)
  → Route to handler based on path
  → Validate auth (Bearer token)
  → Translate request to upstream format
  → Select account/credential from pool (LRU or highest-balance)
  → Send to upstream LLM backend
  → On error: classify → retry with next account or return to client
  → On success: translate response back to client's format
  → Stream SSE events (or return JSON for non-streaming)
  → Client receives response

Endpoints to implement

Path Method Format Auth Header
/v1/models GET OpenAI Authorization: Bearer <key>
/v1/chat/completions POST OpenAI Chat Authorization: Bearer <key>
/v1/messages POST Anthropic Messages Authorization: Bearer <key> or x-api-key: <key>
/v1/responses POST OpenAI Responses Authorization: Bearer <key>

HTTP Headers for SSE streaming

Every streaming endpoint must set these headers before writing any body:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

Additionally, set X-Accel-Buffering: no if behind nginx to prevent buffering.

CORS

Allow all origins for API proxy use:

Access-Control-Allow-Origin: *
Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS
Access-Control-Allow-Headers: Content-Type, Authorization, X-API-Key

Handle OPTIONS preflight with 200 and no body.


1. OpenAI Chat Completions (/v1/chat/completions)

This is the simplest interface — if upstream is already OpenAI-compatible, it's mostly passthrough.

Request format

{
  "model": "claude-sonnet-4-6",
  "messages": [
    {"role": "system", "content": "You are helpful."},
    {"role": "user", "content": "Hello"},
    {"role": "assistant", "content": "Hi there!"},
    {"role": "user", "content": "How are you?"}
  ],
  "stream": true,
  "max_tokens": 4096,
  "temperature": 0.7,
  "tools": [...],
  "tool_choice": "auto"
}

Streaming SSE format

Each line is data: <JSON>\n\n. The stream ends with data: [DONE]\n\n.

data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1700000000,"model":"claude-sonnet-4-6","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1700000000,"model":"claude-sonnet-4-6","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1700000000,"model":"claude-sonnet-4-6","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":5,"total_tokens":15}}

data: [DONE]

Key fields in each chunk:

  • id: Consistent across all chunks in one response (format: chatcmpl-<random>)
  • object: Always "chat.completion.chunk"
  • choices[0].delta.role: Only in the first chunk ("assistant")
  • choices[0].delta.content: Text content (empty string or absent when not present)
  • choices[0].finish_reason: null until final chunk, then "stop" or "tool_calls" or "length"
  • usage: Only in the final chunk (some upstreams omit this)

Non-streaming response

{
  "id": "chatcmpl-abc123",
  "object": "chat.completion",
  "created": 1700000000,
  "model": "claude-sonnet-4-6",
  "choices": [{
    "index": 0,
    "message": {"role": "assistant", "content": "Hello!"},
    "finish_reason": "stop"
  }],
  "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
}

Tool calls in streaming

Tool calls arrive as incremental chunks in delta.tool_calls:

data: {"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_abc","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}

data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"loc"}}]}}]}

data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"ation\": \"SF\"}"}}]}}]}

The id and name only appear in the first chunk for that tool call index. Subsequent chunks only have arguments fragments. Accumulate them client-side.


2. Anthropic Messages (/v1/messages)

Request format

{
  "model": "claude-sonnet-4-6",
  "system": "You are helpful.",
  "messages": [
    {"role": "user", "content": "Hello"},
    {"role": "assistant", "content": [
      {"type": "text", "text": "Hi!"}
    ]},
    {"role": "user", "content": [
      {"type": "text", "text": "How are you?"}
    ]}
  ],
  "max_tokens": 4096,
  "stream": true,
  "thinking": {"type": "enabled", "budget_tokens": 10000},
  "tools": [{"name": "get_weather", "description": "...", "input_schema": {...}}],
  "tool_choice": {"type": "auto"}
}

Key differences from OpenAI:

  • system is a top-level field (string or array of {type: "text", text: "..."} blocks)
  • content can be string or array of typed blocks (text, image, tool_use, tool_result, thinking)
  • tools[].input_schema instead of tools[].function.parameters
  • tool_choice is {type: "auto"|"any"|"tool"|"none"} not a string
  • thinking field for extended thinking mode

Streaming SSE format — FULL EVENT SEQUENCE

The Anthropic SSE format uses event: <type>\ndata: <JSON>\n\n. The event sequence is strict:

event: message_start
data: {"type":"message_start","message":{"id":"msg_abc","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4-6","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0}}}

event: ping
data: {"type":"ping"}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" there!"}}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":15}}

event: message_stop
data: {"type":"message_stop"}

Critical details:

  • ping event after message_start — Claude Code and other clients expect this
  • content_block_start → multiple content_block_deltacontent_block_stop per block
  • Block index increments for each content block (text, thinking, tool_use)
  • message_delta carries the final stop_reason and output usage
  • message_stop is the terminator (no [DONE] marker)

Thinking blocks (extended thinking)

When thinking is enabled, a thinking block comes before the text block:

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":"","signature":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"Let me think..."}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":""}}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: content_block_start
data: {"type":"content_block_start","index":1,"content_block":{"type":"text","text":""}}

The signature_delta must be sent before closing the thinking block, even if the signature is empty.

Tool use in streaming

event: content_block_start
data: {"type":"content_block_start","index":1,"content_block":{"type":"tool_use","id":"toolu_abc","name":"get_weather","input":{}}}

event: content_block_delta
data: {"type":"content_block_delta","index":1,"delta":{"type":"input_json_delta","partial_json":"{\"location\":"}}

event: content_block_delta
data: {"type":"content_block_delta","index":1,"delta":{"type":"input_json_delta","partial_json":" \"SF\"}"}}

event: content_block_stop
data: {"type":"content_block_stop","index":1}

When tool use is the stop reason: stop_reason = "tool_use" in message_delta.

Non-streaming response

{
  "id": "msg_abc",
  "type": "message",
  "role": "assistant",
  "content": [
    {"type": "text", "text": "Hello!"}
  ],
  "model": "claude-sonnet-4-6",
  "stop_reason": "end_turn",
  "stop_sequence": null,
  "usage": {
    "input_tokens": 10,
    "output_tokens": 5,
    "cache_creation_input_tokens": 0,
    "cache_read_input_tokens": 0
  }
}

Stop reason mapping (OpenAI ↔ Anthropic)

OpenAI finish_reason Anthropic stop_reason
"stop" "end_turn"
"tool_calls" "tool_use"
"length" "max_tokens"

3. OpenAI Responses API (/v1/responses)

This is the strictest SSE format. Clients like Cherry Studio validate every field — missing sequence_number or item_id causes empty renders.

Request format

{
  "model": "claude-sonnet-4-6",
  "input": "Hello",
  "stream": true,
  "max_output_tokens": 4096,
  "instructions": "You are helpful.",
  "temperature": 0.7,
  "tools": [...],
  "tool_choice": "auto"
}

input can be:

  • A string (simple prompt → converted to single user message)
  • An array of message objects with role and content (content blocks use input_text/output_text types)

Streaming SSE format — COMPLETE EVENT SEQUENCE

Every event has event: <type>\ndata: <JSON>\n\n. Every data payload must include sequence_number (incrementing integer starting at 0).

event: response.created
data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_abc","object":"response","created_at":1700000000,"status":"in_progress","model":"claude-sonnet-4-6","output":[],"usage":null,"error":null,"incomplete_details":null,"instructions":null,"metadata":{},"parallel_tool_calls":true,"temperature":1.0,"tool_choice":"auto","tools":[],"top_p":1.0,"max_output_tokens":null,"previous_response_id":null,"reasoning":{"effort":null,"summary":null},"store":true,"truncation":"disabled","user":null}}

event: response.in_progress
data: {"type":"response.in_progress","sequence_number":1,"response":{...same as above...}}

event: response.output_item.added
data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"type":"message","id":"msg_abc","status":"in_progress","role":"assistant","content":[]}}

event: response.content_part.added
data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_abc","output_index":0,"content_index":0,"part":{"type":"output_text","text":"","annotations":[]}}

event: response.output_text.delta
data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_abc","output_index":0,"content_index":0,"delta":"Hello"}

event: response.output_text.delta
data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_abc","output_index":0,"content_index":0,"delta":" there!"}

event: response.output_text.done
data: {"type":"response.output_text.done","sequence_number":6,"item_id":"msg_abc","output_index":0,"content_index":0,"text":"Hello there!"}

event: response.content_part.done
data: {"type":"response.content_part.done","sequence_number":7,"item_id":"msg_abc","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello there!","annotations":[]}}

event: response.output_item.done
data: {"type":"response.output_item.done","sequence_number":8,"output_index":0,"item":{"type":"message","id":"msg_abc","status":"completed","role":"assistant","content":[{"type":"output_text","text":"Hello there!","annotations":[]}]}}

event: response.completed
data: {"type":"response.completed","sequence_number":9,"response":{"id":"resp_abc","object":"response","created_at":1700000000,"status":"completed","model":"claude-sonnet-4-6","output":[...],"usage":{"input_tokens":10,"output_tokens":5,"total_tokens":15},"error":null,...all other fields...}}

Mandatory fields clients validate

These are the fields that cause empty responses in strict clients if missing:

Field Where Why
sequence_number Every SSE event Clients use this for ordering and deduplication
item_id content_part.added, output_text.delta, output_text.done, content_part.done Associates content with its parent message item
annotations Inside part objects (output_text) Array (can be empty []) — missing = parse failure
response.in_progress event After response.created Some clients wait for this before processing content
Full response metadata In response.created and response.completed Fields like metadata, tools, reasoning, temperature, etc.

Tool calls (function_call) in streaming

Tool calls in Responses API are separate function_call output items (not embedded in a message):

event: response.output_item.added
data: {"type":"response.output_item.added","sequence_number":N,"output_index":1,"item":{"type":"function_call","id":"fc_abc","call_id":"call_abc","name":"get_weather","arguments":"","status":"in_progress"}}

event: response.function_call_arguments.delta
data: {"type":"response.function_call_arguments.delta","sequence_number":N+1,"item_id":"fc_abc","output_index":1,"delta":"{\"location\":"}

event: response.function_call_arguments.done
data: {"type":"response.function_call_arguments.done","sequence_number":N+2,"item_id":"fc_abc","output_index":1,"arguments":"{\"location\": \"SF\"}"}

event: response.output_item.done
data: {"type":"response.output_item.done","sequence_number":N+3,"output_index":1,"item":{"type":"function_call","id":"fc_abc","call_id":"call_abc","name":"get_weather","arguments":"{\"location\": \"SF\"}","status":"completed"}}

Non-streaming response

{
  "id": "resp_abc",
  "object": "response",
  "created_at": 1700000000,
  "status": "completed",
  "model": "claude-sonnet-4-6",
  "output": [
    {
      "type": "message",
      "id": "msg_abc",
      "status": "completed",
      "role": "assistant",
      "content": [{"type": "output_text", "text": "Hello!", "annotations": []}]
    }
  ],
  "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}
}

Format Conversion Rules

When your upstream speaks one format and your client speaks another, you need bidirectional conversion.

Anthropic → OpenAI (request translation)

Anthropic OpenAI
system (string or block array) messages[0] with role: "system"
messages[].content (block array) Flatten: text blocks → {type:"text", text:...}, image blocks → {type:"image_url", image_url:{url:...}}
tool_use content blocks assistant message's tool_calls array: {id, type:"function", function:{name, arguments: JSON.stringify(input)}}
tool_result content blocks Separate {role:"tool", tool_call_id, content} message
thinking / redacted_thinking blocks Skip (upstream typically doesn't support)
tools[].input_schema tools[].function.parameters
tool_choice: {type:"auto"} tool_choice: "auto"
tool_choice: {type:"any"} tool_choice: "required"
tool_choice: {type:"tool", name:"X"} tool_choice: {type:"function", function:{name:"X"}}
tool_choice: {type:"none"} tool_choice: "none"
max_tokens max_tokens (pass through)

OpenAI → Anthropic (response translation)

OpenAI Response Anthropic Response
choices[0].message.content content: [{type:"text", text:...}]
choices[0].message.tool_calls content: [{type:"tool_use", id, name, input: JSON.parse(arguments)}]
finish_reason: "stop" stop_reason: "end_turn"
finish_reason: "tool_calls" stop_reason: "tool_use"
finish_reason: "length" stop_reason: "max_tokens"
usage.prompt_tokens usage.input_tokens
usage.completion_tokens usage.output_tokens

Responses API → Chat Completions (request translation)

Responses API Chat Completions
input (string) messages: [{role:"user", content: input}]
input (array of message objects) Convert each: input_text/output_text blocks → plain text
instructions messages[0] with role: "system"
max_output_tokens max_tokens
tools / tool_choice Pass through (same format)

Chat Completions → Responses API (response translation)

Chat Completions Response Responses API Response
choices[0].message.content output: [{type:"message", content:[{type:"output_text", text:..., annotations:[]}]}]
choices[0].message.tool_calls output: [{type:"function_call", id, call_id, name, arguments, status:"completed"}]
usage.prompt_tokens usage.input_tokens
usage.completion_tokens usage.output_tokens

Error Handling & Account Pool Retry

When proxying through a pool of accounts/credentials, errors need a 3-tier classification to decide between "return to client" vs "retry with next account" vs "disable account and retry."

3-Tier Error Classification

Upstream returns non-200
  │
  ├─ Tier 1: Request Too Large (403 + "estimated cost")
  │  → Return error to client immediately
  │  → No account is at fault; the request exceeds any account's limit
  │
  ├─ Tier 2: Insufficient Tokens (403 + "insufficient tokens"|"upgrade your plan"|"limit reached")
  │  → Retry with next account (don't disable current one)
  │  → This account might recover; another might have enough
  │
  ├─ Tier 3: Account Exhausted (429 or 402)
  │  → Disable account, retry with next
  │  → Account is definitively out of quota
  │
  ├─ Auth Invalid (401)
  │  → Disable account, retry with next
  │  → Token expired or revoked
  │
  └─ Other errors
     → Return error to client
     → Unknown issue, don't mask it

Retry loop pattern (pseudocode)

MAX_RETRIES = 10
tried_ids = []

for attempt in range(MAX_RETRIES):
    account = get_active_account(exclude=tried_ids)
    if not account:
        return 503 "No active accounts available"
    
    tried_ids.append(account.id)
    mark_account_used(account.id)
    
    response = call_upstream(account.api_key, request_payload)
    
    if response.status == 200:
        stream_or_return(response)
        return
    
    error_body = response.body
    
    if is_request_too_large(response.status, error_body):
        return error_to_client(response.status, error_body)  # No retry
    
    if is_insufficient_tokens(response.status, error_body):
        continue  # Try next account, don't disable
    
    if is_token_exhausted(response.status, error_body):
        disable_account(account.id)
        continue  # Try next account
    
    if response.status == 401:
        disable_account(account.id)
        continue  # Try next account
    
    # Unknown error — return to client
    return error_to_client(response.status, error_body)

return 503 "All accounts exhausted"

This pattern must be applied identically across all 6 code paths (3 interfaces x streaming/non-streaming).

Proxy failure handling

If the upstream request fails at the network level (connection refused, timeout):

  • Mark the proxy as failed (if using proxy rotation)
  • Retry with next account (which may get a different proxy)
  • Don't disable the account — it wasn't the account's fault

Anthropic Compatibility Edge Cases

These are real-world issues that cause 400 errors or broken streams if not handled.

Orphaned tool_use blocks

When clients truncate conversation history, an assistant message may contain tool_use blocks without a corresponding tool_result in the next user message. Anthropic's API rejects this. Fix by scanning messages and inserting synthetic tool_result blocks:

{
  "type": "tool_result",
  "tool_use_id": "<orphaned_id>",
  "content": "[Tool result unavailable - conversation history was truncated]",
  "is_error": true
}

cache_control stripping

If the upstream doesn't support Anthropic's prompt caching, strip cache_control fields from:

  • system blocks
  • messages[].content blocks
  • tools array items

thinking field stripping

If the upstream doesn't support extended thinking, delete the thinking field from the request before forwarding.

Image format conversion

Anthropic uses {type:"image", source:{type:"base64", media_type:"image/png", data:"..."}}. OpenAI uses {type:"image_url", image_url:{url:"data:image/png;base64,..."}}. Convert between them when translating.


Model Name Mapping

Support both short aliases and provider-prefixed names. Map legacy names to current ones:

# Alias → Provider-prefixed
claude-sonnet-4-6       → anthropic/claude-sonnet-4-6
claude-opus-4-6         → anthropic/claude-opus-4-6
gpt-5-mini              → openai/gpt-5-mini

# Legacy → Current
claude-3-5-sonnet-20241022 → anthropic/claude-sonnet-4-5
claude-3-opus-20240229     → anthropic/claude-opus-4-6

# Auto-prefix by pattern
claude-*  → anthropic/
gpt-*     → openai/
gemini-*  → google/
grok-*    → xai/

The /v1/models endpoint should return both upstream models and local aliases (deduplicated). Cache with a TTL (e.g., 5 minutes).


Implementation Checklist

Use this when building a new adapter or auditing an existing one.

Per-endpoint checklist

  • [ ] Auth validation — check Bearer token before processing
  • [ ] Request parsing — handle both stream: true and stream: false
  • [ ] Model mapping — translate aliases to upstream model IDs
  • [ ] Format translation — convert request to upstream format
  • [ ] Error classification — implement all 3 tiers + 401 handling
  • [ ] Retry loop — up to N accounts, track tried IDs
  • [ ] SSE headers — Content-Type, Cache-Control, Connection, X-Accel-Buffering
  • [ ] Flush after each event — critical for real-time streaming
  • [ ] Usage tracking — capture token counts from upstream, include in final event

Per-format checklist

OpenAI Chat Completions:

  • [ ] Passthrough streaming (if upstream is OpenAI-compatible)
  • [ ] data: [DONE] terminator
  • [ ] Consistent id across chunks

Anthropic Messages:

  • [ ] message_startping → content blocks → message_deltamessage_stop
  • [ ] Block index tracking (increments per block)
  • [ ] Thinking blocks with signature_delta before close
  • [ ] Tool use: content_block_start with tool_use type → input_json_delta chunks
  • [ ] cache_creation_input_tokens and cache_read_input_tokens in usage (even if 0)

OpenAI Responses:

  • [ ] sequence_number on every event (incrementing from 0)
  • [ ] item_id on content_part and text delta/done events
  • [ ] response.in_progress event after response.created
  • [ ] annotations: [] in all output_text parts
  • [ ] Full response metadata in response.created and response.completed
  • [ ] function_call as separate output items (not embedded in message)

Debugging Empty Responses

When a client shows empty output but curl works, check these in order:

  1. Missing sequence_number (Responses API) — most common cause for Cherry Studio
  2. Missing item_id on delta events (Responses API) — client can't associate text with item
  3. Missing response.in_progress event — some clients wait for it
  4. Missing annotations: [] in output_text parts — causes parse failures
  5. Missing ping event (Anthropic) — Claude Code may stall
  6. No flush after each SSE event — data sits in buffer, never reaches client
  7. Nginx buffering — missing X-Accel-Buffering: no header
  8. Content-Type wrong — must be text/event-stream, not application/json
  9. Double newline missing — SSE events must end with \n\n, not just \n

For more details on SSE wire format requirements, see the SSE Wire Format section below. For format conversion code patterns, see the Conversion Patterns section below.


二、参考资料(references/)

Conversion Patterns

Language-agnostic pseudocode for converting between all three LLM API formats. Real-world patterns extracted from a production proxy handling 470+ accounts.

Table of Contents

  1. Anthropic Request → OpenAI Request
  2. OpenAI Response → Anthropic Response
  3. OpenAI Stream → Anthropic Stream
  4. Responses Request → Chat Completions Request
  5. Chat Completions Stream → Responses Stream
  6. Chat Completions Response → Responses Response
  7. Tool Format Conversion

Anthropic to OpenAI Request

function translateAnthropicToOpenAI(anthropicPayload):
    openai = {
        model: anthropicPayload.model,
        messages: [],
        max_tokens: anthropicPayload.max_tokens ?? 4096,
        stream: false
    }
    
    // Pass through optional params
    if anthropicPayload.temperature: openai.temperature = anthropicPayload.temperature
    if anthropicPayload.top_p: openai.top_p = anthropicPayload.top_p
    
    // System message
    if anthropicPayload.system:
        if typeof system == string:
            openai.messages.push({role: "system", content: system})
        if typeof system == array:
            text = system.filter(b => b.type == "text").map(b => b.text).join("")
            openai.messages.push({role: "system", content: text})
    
    // Messages
    for msg in anthropicPayload.messages:
        if typeof msg.content == string:
            openai.messages.push({role: msg.role, content: msg.content})
            continue
        
        // Array content — process each block
        openContent = []
        toolCalls = []
        
        for block in msg.content:
            switch block.type:
                case "text":
                    openContent.push({type: "text", text: block.text})
                
                case "thinking", "redacted_thinking":
                    skip  // Upstream doesn't support thinking
                
                case "image":
                    if block.source.type == "base64":
                        url = "data:{block.source.media_type};base64,{block.source.data}"
                        openContent.push({type: "image_url", image_url: {url}})
                    else if block.source.type == "url":
                        openContent.push({type: "image_url", image_url: {url: block.source.url}})
                
                case "tool_use":
                    toolCalls.push({
                        id: block.id,
                        type: "function",
                        function: {name: block.name, arguments: JSON.stringify(block.input)}
                    })
                
                case "tool_result":
                    // Tool results become separate messages
                    resultContent = ""
                    if typeof block.content == string:
                        resultContent = block.content
                    else if typeof block.content == array:
                        resultContent = block.content.filter(b => b.text).map(b => b.text).join("")
                    openai.messages.push({
                        role: "tool",
                        tool_call_id: block.tool_use_id,
                        content: resultContent
                    })
        
        // Build the message
        if openContent.length > 0 or toolCalls.length > 0:
            msgBody = {role: msg.role}
            
            if openContent.length > 0:
                // Simplify: single text block with no images → plain string
                if openContent.length == 1 and openContent[0].type == "text":
                    msgBody.content = openContent[0].text
                else:
                    msgBody.content = openContent
            
            if toolCalls.length > 0:
                msgBody.tool_calls = toolCalls
            
            openai.messages.push(msgBody)
    
    // Tools
    if anthropicPayload.tools:
        openai.tools = anthropicPayload.tools.map(t => ({
            type: "function",
            function: {
                name: t.name,
                description: t.description,
                parameters: t.input_schema
            }
        }))
        
        // Tool choice mapping
        if anthropicPayload.tool_choice:
            switch anthropicPayload.tool_choice.type:
                case "auto": openai.tool_choice = "auto"
                case "any": openai.tool_choice = "required"
                case "none": openai.tool_choice = "none"
                case "tool":
                    openai.tool_choice = {
                        type: "function",
                        function: {name: anthropicPayload.tool_choice.name}
                    }
    
    return openai

OpenAI to Anthropic Response

function openaiSyncToAnthropicJSON(openaiResp, origModel, thinkingEnabled):
    msgID = "msg_" + randomHex(12)
    
    choice = openaiResp.choices[0] ?? {}
    message = choice.message ?? {}
    content = []
    
    // Thinking block (if upstream provided reasoning_content)
    if thinkingEnabled and message.reasoning_content:
        content.push({type: "thinking", thinking: message.reasoning_content, signature: ""})
    
    // Text content
    if message.content:
        content.push({type: "text", text: message.content})
    
    // Tool calls
    if message.tool_calls:
        for tc in message.tool_calls:
            input = JSON.parse(tc.function.arguments) ?? {}
            content.push({
                type: "tool_use",
                id: tc.id,
                name: tc.function.name,
                input: input
            })
    
    // Stop reason mapping
    stopReason = "end_turn"
    switch choice.finish_reason:
        case "tool_calls": stopReason = "tool_use"
        case "length": stopReason = "max_tokens"
        case "stop": stopReason = "end_turn"
    
    return {
        id: msgID,
        type: "message",
        role: "assistant",
        content: content,
        model: origModel,
        stop_reason: stopReason,
        stop_sequence: null,
        usage: {
            input_tokens: openaiResp.usage?.prompt_tokens ?? 0,
            output_tokens: openaiResp.usage?.completion_tokens ?? 0,
            cache_creation_input_tokens: 0,
            cache_read_input_tokens: 0
        }
    }

OpenAI Stream to Anthropic Stream

This is the most complex conversion — translating OpenAI streaming chunks into Anthropic SSE events in real-time.

function streamOpenAIToAnthropic(upstreamSSE, writer, origModel, thinkingEnabled):
    msgID = "msg_" + randomHex(12)
    
    // State tracking
    blockIndex = 0
    textBlockStarted = false
    reasoningBlockStarted = false
    toolCallBlockStarted = {}    // openai_tc_index → bool
    toolCallToBlock = {}         // openai_tc_index → anthropic_block_index
    finishReason = ""
    
    // Send initial events
    emit("message_start", {
        type: "message_start",
        message: {
            id: msgID, type: "message", role: "assistant",
            content: [], model: origModel,
            stop_reason: null, stop_sequence: null,
            usage: {input_tokens: 0, output_tokens: 0,
                    cache_creation_input_tokens: 0, cache_read_input_tokens: 0}
        }
    })
    emit("ping", {type: "ping"})
    
    // Process each upstream chunk
    for chunk in upstreamSSE:
        delta = chunk.choices[0].delta
        
        // Capture finish_reason
        if chunk.choices[0].finish_reason:
            finishReason = chunk.choices[0].finish_reason
        
        // Handle reasoning_content (thinking)
        if delta.reasoning_content and thinkingEnabled:
            if not reasoningBlockStarted:
                emit("content_block_start", {
                    type: "content_block_start", index: blockIndex,
                    content_block: {type: "thinking", thinking: "", signature: ""}
                })
                reasoningBlockStarted = true
            emit("content_block_delta", {
                type: "content_block_delta", index: blockIndex,
                delta: {type: "thinking_delta", thinking: delta.reasoning_content}
            })
        
        // Handle text content
        if delta.content:
            // Close reasoning block first
            if reasoningBlockStarted and not textBlockStarted:
                emit("content_block_delta", {
                    type: "content_block_delta", index: blockIndex,
                    delta: {type: "signature_delta", signature: ""}
                })
                emit("content_block_stop", {type: "content_block_stop", index: blockIndex})
                blockIndex++
            
            if not textBlockStarted:
                emit("content_block_start", {
                    type: "content_block_start", index: blockIndex,
                    content_block: {type: "text", text: ""}
                })
                textBlockStarted = true
            
            emit("content_block_delta", {
                type: "content_block_delta", index: blockIndex,
                delta: {type: "text_delta", text: delta.content}
            })
        
        // Handle tool calls
        if delta.tool_calls:
            for tc in delta.tool_calls:
                tcIdx = tc.index
                
                if not toolCallBlockStarted[tcIdx]:
                    // Close text block before first tool call
                    if textBlockStarted and toolCallBlockStarted.isEmpty():
                        emit("content_block_stop", {index: blockIndex})
                        blockIndex++
                    
                    toolCallToBlock[tcIdx] = blockIndex
                    emit("content_block_start", {
                        type: "content_block_start", index: blockIndex,
                        content_block: {
                            type: "tool_use", id: tc.id,
                            name: tc.function.name, input: {}
                        }
                    })
                    toolCallBlockStarted[tcIdx] = true
                    
                    if tc.function.arguments:
                        emit("content_block_delta", {
                            index: blockIndex,
                            delta: {type: "input_json_delta", partial_json: tc.function.arguments}
                        })
                else:
                    bi = toolCallToBlock[tcIdx]
                    if tc.function.arguments:
                        emit("content_block_delta", {
                            index: bi,
                            delta: {type: "input_json_delta", partial_json: tc.function.arguments}
                        })
    
    // Close all open blocks
    if reasoningBlockStarted and not textBlockStarted and toolCallBlockStarted.isEmpty():
        emit("content_block_delta", {index: blockIndex, delta: {type: "signature_delta", signature: ""}})
        emit("content_block_stop", {index: blockIndex})
    
    if textBlockStarted and toolCallBlockStarted.isEmpty():
        emit("content_block_stop", {index: blockIndex})
    
    for tcIdx in toolCallBlockStarted:
        emit("content_block_stop", {index: toolCallToBlock[tcIdx]})
    
    // Stop reason mapping
    stopReason = "end_turn"
    switch finishReason:
        case "tool_calls": stopReason = "tool_use"
        case "length": stopReason = "max_tokens"
    
    emit("message_delta", {
        type: "message_delta",
        delta: {stop_reason: stopReason},
        usage: {output_tokens: 0}
    })
    emit("message_stop", {type: "message_stop"})

Responses to Chat Completions Request

function translateResponsesInput(responsesPayload):
    chatPayload = {
        model: responsesPayload.model,
        stream: false,
        max_tokens: responsesPayload.max_output_tokens ?? 4096
    }
    
    if responsesPayload.temperature: chatPayload.temperature = responsesPayload.temperature
    if responsesPayload.top_p: chatPayload.top_p = responsesPayload.top_p
    
    messages = []
    
    // Instructions → system message
    if responsesPayload.instructions:
        messages.push({role: "system", content: responsesPayload.instructions})
    
    // Input handling
    if typeof responsesPayload.input == string:
        messages.push({role: "user", content: responsesPayload.input})
    
    else if typeof responsesPayload.input == array:
        for item in responsesPayload.input:
            role = item.role
            
            if typeof item.content == string:
                messages.push({role, content: item.content})
            
            else if typeof item.content == array:
                // Flatten input_text/output_text blocks to plain text
                textParts = []
                for block in item.content:
                    if block.type in ["input_text", "output_text", "text"]:
                        textParts.push(block.text)
                if textParts.length > 0:
                    messages.push({role, content: textParts.join("\n")})
    
    chatPayload.messages = messages
    
    // Tools passthrough
    if responsesPayload.tools: chatPayload.tools = responsesPayload.tools
    if responsesPayload.tool_choice: chatPayload.tool_choice = responsesPayload.tool_choice
    
    return chatPayload

Chat Completions Stream to Responses Stream

function streamChatToResponses(upstreamSSE, writer, origModel):
    respID = "resp_" + randomHex(12)
    msgID = "msg_" + randomHex(12)
    seqNum = 0
    created = now()
    
    // Helper: emit with auto-incrementing sequence_number
    function emit(eventType, data):
        data.sequence_number = seqNum++
        writeSSE(writer, eventType, data)
        flush()
    
    // Build the "in progress" response object with all required fields
    inProgressResp = {
        id: respID, object: "response", created_at: created,
        status: "in_progress", model: origModel,
        output: [], usage: null,
        error: null, incomplete_details: null, instructions: null,
        metadata: {}, parallel_tool_calls: true,
        temperature: 1.0, tool_choice: "auto", tools: [],
        top_p: 1.0, max_output_tokens: null, previous_response_id: null,
        reasoning: {effort: null, summary: null},
        store: true, truncation: "disabled", user: null
    }
    
    emit("response.created", {type: "response.created", response: inProgressResp})
    emit("response.in_progress", {type: "response.in_progress", response: inProgressResp})
    
    emit("response.output_item.added", {
        type: "response.output_item.added", output_index: 0,
        item: {type: "message", id: msgID, status: "in_progress",
               role: "assistant", content: []}
    })
    
    emit("response.content_part.added", {
        type: "response.content_part.added",
        item_id: msgID, output_index: 0, content_index: 0,
        part: {type: "output_text", text: "", annotations: []}
    })
    
    // Track state
    fullText = ""
    textBlockSent = false
    toolCallNames = {}  // index → name
    toolCallIDs = {}    // index → id
    toolCallArgs = {}   // index → accumulated arguments
    upstreamUsage = {input_tokens: 0, output_tokens: 0, total_tokens: 0}
    
    for chunk in upstreamSSE:
        // Capture usage from final chunk
        if chunk.usage:
            upstreamUsage = {
                input_tokens: chunk.usage.prompt_tokens,
                output_tokens: chunk.usage.completion_tokens,
                total_tokens: chunk.usage.total_tokens
            }
        
        delta = chunk.choices[0]?.delta
        if not delta: continue
        
        if delta.content:
            fullText += delta.content
            textBlockSent = true
            emit("response.output_text.delta", {
                type: "response.output_text.delta",
                item_id: msgID, output_index: 0, content_index: 0,
                delta: delta.content
            })
        
        // Accumulate tool calls
        if delta.tool_calls:
            for tc in delta.tool_calls:
                idx = tc.index
                if tc.id: toolCallIDs[idx] = tc.id
                if tc.function?.name: toolCallNames[idx] = tc.function.name
                if tc.function?.arguments: toolCallArgs[idx] += tc.function.arguments
    
    // Close text block
    allOutput = []
    outputIndex = 0
    
    if textBlockSent or fullText:
        emit("response.output_text.done", {
            type: "response.output_text.done",
            item_id: msgID, output_index: 0, content_index: 0,
            text: fullText
        })
        emit("response.content_part.done", {
            type: "response.content_part.done",
            item_id: msgID, output_index: 0, content_index: 0,
            part: {type: "output_text", text: fullText, annotations: []}
        })
        msgItem = {
            type: "message", id: msgID, status: "completed",
            role: "assistant",
            content: [{type: "output_text", text: fullText, annotations: []}]
        }
        emit("response.output_item.done", {
            type: "response.output_item.done", output_index: outputIndex, item: msgItem
        })
        allOutput.push(msgItem)
        outputIndex++
    
    // Emit function_call items
    for i in sorted(toolCallNames.keys()):
        fcID = "fc_" + randomHex(12)
        fcItem = {
            type: "function_call", id: fcID,
            call_id: toolCallIDs[i], name: toolCallNames[i],
            arguments: toolCallArgs[i], status: "completed"
        }
        emit("response.output_item.added", {
            type: "response.output_item.added", output_index: outputIndex, item: fcItem
        })
        emit("response.function_call_arguments.done", {
            type: "response.function_call_arguments.done",
            item_id: fcID, output_index: outputIndex,
            arguments: toolCallArgs[i]
        })
        emit("response.output_item.done", {
            type: "response.output_item.done", output_index: outputIndex, item: fcItem
        })
        allOutput.push(fcItem)
        outputIndex++
    
    // If nothing output, send empty message
    if allOutput.isEmpty():
        emptyItem = {type: "message", id: msgID, status: "completed",
                     role: "assistant",
                     content: [{type: "output_text", text: "", annotations: []}]}
        emit("response.output_item.done", {output_index: 0, item: emptyItem})
        allOutput.push(emptyItem)
    
    // Final event
    completedResp = copy(inProgressResp)
    completedResp.status = "completed"
    completedResp.output = allOutput
    completedResp.usage = upstreamUsage
    
    emit("response.completed", {type: "response.completed", response: completedResp})

Chat Completions to Responses Response

Non-streaming conversion:

function openaiChatToResponsesJSON(openaiResp, origModel):
    respID = "resp_" + randomHex(12)
    msgID = "msg_" + randomHex(12)
    
    choice = openaiResp.choices[0]
    message = choice?.message ?? {}
    text = message.content ?? ""
    
    output = []
    
    // Text message
    if text:
        output.push({
            type: "message", id: msgID, status: "completed",
            role: "assistant",
            content: [{type: "output_text", text: text, annotations: []}]
        })
    
    // Function calls
    if message.tool_calls:
        for tc in message.tool_calls:
            output.push({
                type: "function_call",
                id: "fc_" + randomHex(12),
                call_id: tc.id,
                name: tc.function.name,
                arguments: tc.function.arguments,
                status: "completed"
            })
    
    // Empty fallback
    if output.isEmpty():
        output.push({
            type: "message", id: msgID, status: "completed",
            role: "assistant",
            content: [{type: "output_text", text: "", annotations: []}]
        })
    
    return {
        id: respID,
        object: "response",
        created_at: openaiResp.created ?? now(),
        status: "completed",
        model: origModel,
        output: output,
        usage: {
            input_tokens: openaiResp.usage?.prompt_tokens ?? 0,
            output_tokens: openaiResp.usage?.completion_tokens ?? 0,
            total_tokens: openaiResp.usage?.total_tokens ?? 0
        }
    }

Tool Format Conversion

Anthropic tools → OpenAI tools

Anthropic:
{name: "get_weather", description: "Get weather", input_schema: {type: "object", properties: {...}}}

OpenAI:
{type: "function", function: {name: "get_weather", description: "Get weather", parameters: {type: "object", properties: {...}}}}

Mapping: input_schemafunction.parameters

OpenAI tool_calls → Anthropic tool_use

OpenAI (in message):
{tool_calls: [{id: "call_abc", type: "function", function: {name: "get_weather", arguments: '{"location":"SF"}'}}]}

Anthropic (in content):
{type: "tool_use", id: "call_abc", name: "get_weather", input: {location: "SF"}}

Mapping: JSON.parse(arguments)input

Anthropic tool_result → OpenAI tool message

Anthropic (in content):
{type: "tool_result", tool_use_id: "call_abc", content: "72°F and sunny"}

OpenAI (separate message):
{role: "tool", tool_call_id: "call_abc", content: "72°F and sunny"}

Anthropic tool_choice → OpenAI tool_choice

Anthropic OpenAI
{type: "auto"} "auto"
{type: "any"} "required"
{type: "none"} "none"
{type: "tool", name: "X"} {type: "function", function: {name: "X"}}

SSE Wire Format

Detailed byte-level format requirements for Server-Sent Events across all three LLM API protocols.

Table of Contents

  1. General SSE Rules
  2. OpenAI Chat Completions SSE
  3. Anthropic Messages SSE
  4. OpenAI Responses API SSE
  5. Keepalive and Timeout
  6. Common Pitfalls

General SSE Rules

SSE is a W3C standard. Each event is:

[event: <type>\n]
data: <payload>\n
\n

Key rules:

  • Lines end with \n (LF), not \r\n
  • Each event terminates with a blank line (\n\n)
  • event: line is optional (OpenAI Chat omits it; Anthropic and Responses use it)
  • data: can span multiple lines: each line starts with data:
  • A comment line starts with : — used for keepalive (: keepalive\n\n)

Flushing

After writing each SSE event to the response writer, you must flush the buffer. Without flushing, events accumulate in the server/framework's output buffer and the client sees nothing until the buffer fills or the stream ends.

In Go: flusher.Flush() via http.Flusher interface In Python (FastAPI/Starlette): yield in StreamingResponse auto-flushes In Node.js (Express): res.flush() or res.flushHeaders() + res.write() In Rust (axum): use axum::response::Sse which auto-flushes

Headers

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no

X-Accel-Buffering: no is critical when behind nginx — without it, nginx buffers the entire response before forwarding.


OpenAI Chat Completions SSE

Format: data: <JSON>\n\n (no event: field)

Wire example (raw bytes)

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1700000000,"model":"gpt-5-mini","choices":[{"index":0,"delta":{"role":"assistant"},"logprobs":null,"finish_reason":null}]}\n
\n
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1700000000,"model":"gpt-5-mini","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}]}\n
\n
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1700000000,"model":"gpt-5-mini","choices":[{"index":0,"delta":{"content":" there!"},"logprobs":null,"finish_reason":null}]}\n
\n
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1700000000,"model":"gpt-5-mini","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":8,"completion_tokens":3,"total_tokens":11}}\n
\n
data: [DONE]\n
\n

Chunk fields

Field First chunk Middle chunks Final chunk
id Present Same value Same value
object "chat.completion.chunk" Same Same
created Unix timestamp Same Same
model Model ID Same Same
system_fingerprint Optional Same Same
choices[0].index 0 0 0
choices[0].delta.role "assistant" Absent Absent
choices[0].delta.content Absent Text fragment Absent
choices[0].delta.tool_calls Absent Tool call fragments Absent
choices[0].logprobs null null null
choices[0].finish_reason null null "stop" / "tool_calls" / "length"
usage Absent Absent Token counts (optional)

Tool call streaming detail

Tool calls arrive incrementally across multiple chunks:

# Chunk 1: tool call header (id, name, empty arguments)
delta.tool_calls: [{"index":0,"id":"call_abc","type":"function","function":{"name":"get_weather","arguments":""}}]

# Chunk 2-N: argument fragments
delta.tool_calls: [{"index":0,"function":{"arguments":"{\"loc"}}]
delta.tool_calls: [{"index":0,"function":{"arguments":"ation\":"}}]
delta.tool_calls: [{"index":0,"function":{"arguments":" \"SF\"}"}}]

# Final chunk
delta: {}, finish_reason: "tool_calls"

Accumulate arguments fragments per tool call index. The id and name only appear once.


Anthropic Messages SSE

Format: event: <type>\ndata: <JSON>\n\n

Complete event sequence

event: message_start\n
data: {"type":"message_start","message":{...}}\n
\n
event: ping\n
data: {"type":"ping"}\n
\n
event: content_block_start\n
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}\n
\n
event: content_block_delta\n
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}\n
\n
event: content_block_stop\n
data: {"type":"content_block_stop","index":0}\n
\n
event: message_delta\n
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":15}}\n
\n
event: message_stop\n
data: {"type":"message_stop"}\n
\n

Event type reference

Event Payload When
message_start Full message object (empty content, usage with input_tokens) First event
ping {"type":"ping"} After message_start
content_block_start Block type + index Before each content block
content_block_delta Delta for current block During content generation
content_block_stop Block index After each content block
message_delta stop_reason + output usage After all blocks
message_stop {"type":"message_stop"} Final event

Content block types and their deltas

Block type Start payload Delta type Delta payload
text {"type":"text","text":""} text_delta {"type":"text_delta","text":"..."}
thinking {"type":"thinking","thinking":"","signature":""} thinking_delta {"type":"thinking_delta","thinking":"..."}
thinking (close) signature_delta {"type":"signature_delta","signature":""}
tool_use {"type":"tool_use","id":"...","name":"...","input":{}} input_json_delta {"type":"input_json_delta","partial_json":"..."}

Block ordering rules

  1. Thinking blocks come first (if thinking enabled)
  2. Text blocks come after thinking
  3. Tool use blocks come after text
  4. Each block increments the index
  5. Close reasoning block (signature_delta + content_block_stop) before starting text block

OpenAI Responses API SSE

Format: event: <type>\ndata: <JSON>\n\n

This is the strictest format. Every event must include sequence_number.

Complete event sequence for text response

event: response.created\n
data: {"type":"response.created","sequence_number":0,"response":{...full response object...}}\n
\n
event: response.in_progress\n
data: {"type":"response.in_progress","sequence_number":1,"response":{...same...}}\n
\n
event: response.output_item.added\n
data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"type":"message","id":"msg_abc","status":"in_progress","role":"assistant","content":[]}}\n
\n
event: response.content_part.added\n
data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_abc","output_index":0,"content_index":0,"part":{"type":"output_text","text":"","annotations":[]}}\n
\n
event: response.output_text.delta\n
data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_abc","output_index":0,"content_index":0,"delta":"Hello"}\n
\n
event: response.output_text.done\n
data: {"type":"response.output_text.done","sequence_number":N,"item_id":"msg_abc","output_index":0,"content_index":0,"text":"Hello there!"}\n
\n
event: response.content_part.done\n
data: {"type":"response.content_part.done","sequence_number":N+1,"item_id":"msg_abc","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello there!","annotations":[]}}\n
\n
event: response.output_item.done\n
data: {"type":"response.output_item.done","sequence_number":N+2,"output_index":0,"item":{...completed item...}}\n
\n
event: response.completed\n
data: {"type":"response.completed","sequence_number":N+3,"response":{...completed response...}}\n
\n

Response object — required fields

The response object in response.created and response.completed must include all these fields (use null for absent values):

{
  "id": "resp_abc",
  "object": "response",
  "created_at": 1700000000,
  "status": "in_progress",
  "model": "claude-sonnet-4-6",
  "output": [],
  "usage": null,
  "error": null,
  "incomplete_details": null,
  "instructions": null,
  "metadata": {},
  "parallel_tool_calls": true,
  "temperature": 1.0,
  "tool_choice": "auto",
  "tools": [],
  "top_p": 1.0,
  "max_output_tokens": null,
  "previous_response_id": null,
  "reasoning": {"effort": null, "summary": null},
  "store": true,
  "truncation": "disabled",
  "user": null
}

Event field matrix

Event sequence_number item_id output_index content_index
response.created Yes No No No
response.in_progress Yes No No No
response.output_item.added Yes No Yes No
response.content_part.added Yes Yes Yes Yes
response.output_text.delta Yes Yes Yes Yes
response.output_text.done Yes Yes Yes Yes
response.content_part.done Yes Yes Yes Yes
response.output_item.done Yes No Yes No
response.function_call_arguments.delta Yes Yes Yes No
response.function_call_arguments.done Yes Yes Yes No
response.completed Yes No No No

Keepalive and Timeout

For long-running requests (e.g., complex reasoning), the stream may go silent for 30+ seconds. Without keepalive, reverse proxies (nginx, Cloudflare) may close the connection.

SSE comment keepalive

Send a comment every 5-15 seconds during silent periods:

: keepalive\n
\n

This is a valid SSE comment (starts with :) — clients ignore it, but it keeps the TCP connection alive.

Implementation pattern

start keepalive timer (every 5s):
    write ": keepalive\n\n"
    flush

on each real SSE event:
    reset keepalive timer

on stream end:
    stop keepalive timer

Client disconnect handling

When the client disconnects mid-stream:

  • Detect via write error or context cancellation
  • Stop the keepalive timer
  • Close the upstream connection (don't waste upstream tokens)
  • Clean up resources

Common Pitfalls

1. Missing double newline

Each SSE event ends with \n\n. A single \n means the event is not yet complete — the client will buffer and wait for more data.

2. No flush after write

Writing to the response writer doesn't mean the data reaches the client. You must flush after each event.

3. Writing headers after first write

HTTP headers must be written before the first body byte. If you call http.Error() after streaming has started, it becomes part of the body and corrupts the SSE stream.

4. Buffer size too small

Scanner/reader buffers must be large enough for the largest possible SSE line. A single chunk with a large tool call argument can exceed 64KB. Use at least 1MB buffer.

5. JSON in SSE data field

The JSON must be on a single line (no pretty-printing). Newlines in JSON would be interpreted as new SSE fields.

6. Forgetting data: [DONE] (OpenAI Chat)

Some clients (like OpenAI's own SDK) wait for [DONE] to finalize the response. Without it, the client may hang or timeout.

7. Inconsistent id across chunks (OpenAI Chat)

All chunks in one response must share the same id. Generating a new ID per chunk breaks clients that group chunks by ID.

8. Missing event: field (Anthropic/Responses)

Unlike OpenAI Chat (which only uses data:), Anthropic and Responses formats require the event: field. Without it, clients can't dispatch events by type.

评论(0)

登录 后可发表评论。

暂无评论。