LLM API 适配层 SOP — OpenAI / Anthropic / Responses 三协议互转 + SSE 流式
适用场景:构建任意 LLM 后端的 OpenAI / Anthropic / Responses 三协议代理;处理 SSE 流式、账号池重试、Cherry Studio 空响应等。
核心理念:每协议独立 handler,共享一套账号池与 3-tier 错误分类;SSE 必须每事件 flush + \n\n 双换行。
一、核心 SOP
This skill is a battle-tested reference for building proxy/adapter layers that expose three industry-standard LLM API interfaces on top of any LLM backend. It is language-agnostic — the patterns apply equally to Go, Python, TypeScript, Rust, or any HTTP server.
The three interfaces:
- OpenAI Chat Completions (
POST /v1/chat/completions) — the most widely supported format - Anthropic Messages (
POST /v1/messages) — native Claude API format - OpenAI Responses (
POST /v1/responses) — newer OpenAI format with strict SSE requirements
All three share a common architecture: receive client request → translate to upstream format → call upstream → translate response back → return to client. The devil is in the SSE streaming details.
Architecture Overview
Client Request (any of 3 formats)
→ Route to handler based on path
→ Validate auth (Bearer token)
→ Translate request to upstream format
→ Select account/credential from pool (LRU or highest-balance)
→ Send to upstream LLM backend
→ On error: classify → retry with next account or return to client
→ On success: translate response back to client's format
→ Stream SSE events (or return JSON for non-streaming)
→ Client receives response
Endpoints to implement
| Path | Method | Format | Auth Header |
|---|---|---|---|
/v1/models |
GET | OpenAI | Authorization: Bearer <key> |
/v1/chat/completions |
POST | OpenAI Chat | Authorization: Bearer <key> |
/v1/messages |
POST | Anthropic Messages | Authorization: Bearer <key> or x-api-key: <key> |
/v1/responses |
POST | OpenAI Responses | Authorization: Bearer <key> |
HTTP Headers for SSE streaming
Every streaming endpoint must set these headers before writing any body:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Additionally, set X-Accel-Buffering: no if behind nginx to prevent buffering.
CORS
Allow all origins for API proxy use:
Access-Control-Allow-Origin: *
Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS
Access-Control-Allow-Headers: Content-Type, Authorization, X-API-Key
Handle OPTIONS preflight with 200 and no body.
1. OpenAI Chat Completions (/v1/chat/completions)
This is the simplest interface — if upstream is already OpenAI-compatible, it's mostly passthrough.
Request format
{
"model": "claude-sonnet-4-6",
"messages": [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
{"role": "user", "content": "How are you?"}
],
"stream": true,
"max_tokens": 4096,
"temperature": 0.7,
"tools": [...],
"tool_choice": "auto"
}
Streaming SSE format
Each line is data: <JSON>\n\n. The stream ends with data: [DONE]\n\n.
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1700000000,"model":"claude-sonnet-4-6","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1700000000,"model":"claude-sonnet-4-6","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1700000000,"model":"claude-sonnet-4-6","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":5,"total_tokens":15}}
data: [DONE]
Key fields in each chunk:
id: Consistent across all chunks in one response (format:chatcmpl-<random>)object: Always"chat.completion.chunk"choices[0].delta.role: Only in the first chunk ("assistant")choices[0].delta.content: Text content (empty string or absent when not present)choices[0].finish_reason:nulluntil final chunk, then"stop"or"tool_calls"or"length"usage: Only in the final chunk (some upstreams omit this)
Non-streaming response
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1700000000,
"model": "claude-sonnet-4-6",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "Hello!"},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
}
Tool calls in streaming
Tool calls arrive as incremental chunks in delta.tool_calls:
data: {"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_abc","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}
data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"loc"}}]}}]}
data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"ation\": \"SF\"}"}}]}}]}
The id and name only appear in the first chunk for that tool call index. Subsequent chunks only have arguments fragments. Accumulate them client-side.
2. Anthropic Messages (/v1/messages)
Request format
{
"model": "claude-sonnet-4-6",
"system": "You are helpful.",
"messages": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": [
{"type": "text", "text": "Hi!"}
]},
{"role": "user", "content": [
{"type": "text", "text": "How are you?"}
]}
],
"max_tokens": 4096,
"stream": true,
"thinking": {"type": "enabled", "budget_tokens": 10000},
"tools": [{"name": "get_weather", "description": "...", "input_schema": {...}}],
"tool_choice": {"type": "auto"}
}
Key differences from OpenAI:
systemis a top-level field (string or array of{type: "text", text: "..."}blocks)contentcan be string or array of typed blocks (text,image,tool_use,tool_result,thinking)tools[].input_schemainstead oftools[].function.parameterstool_choiceis{type: "auto"|"any"|"tool"|"none"}not a stringthinkingfield for extended thinking mode
Streaming SSE format — FULL EVENT SEQUENCE
The Anthropic SSE format uses event: <type>\ndata: <JSON>\n\n. The event sequence is strict:
event: message_start
data: {"type":"message_start","message":{"id":"msg_abc","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4-6","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0}}}
event: ping
data: {"type":"ping"}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" there!"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":15}}
event: message_stop
data: {"type":"message_stop"}
Critical details:
pingevent aftermessage_start— Claude Code and other clients expect thiscontent_block_start→ multiplecontent_block_delta→content_block_stopper block- Block
indexincrements for each content block (text, thinking, tool_use) message_deltacarries the finalstop_reasonand output usagemessage_stopis the terminator (no[DONE]marker)
Thinking blocks (extended thinking)
When thinking is enabled, a thinking block comes before the text block:
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":"","signature":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"Let me think..."}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":""}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: content_block_start
data: {"type":"content_block_start","index":1,"content_block":{"type":"text","text":""}}
The signature_delta must be sent before closing the thinking block, even if the signature is empty.
Tool use in streaming
event: content_block_start
data: {"type":"content_block_start","index":1,"content_block":{"type":"tool_use","id":"toolu_abc","name":"get_weather","input":{}}}
event: content_block_delta
data: {"type":"content_block_delta","index":1,"delta":{"type":"input_json_delta","partial_json":"{\"location\":"}}
event: content_block_delta
data: {"type":"content_block_delta","index":1,"delta":{"type":"input_json_delta","partial_json":" \"SF\"}"}}
event: content_block_stop
data: {"type":"content_block_stop","index":1}
When tool use is the stop reason: stop_reason = "tool_use" in message_delta.
Non-streaming response
{
"id": "msg_abc",
"type": "message",
"role": "assistant",
"content": [
{"type": "text", "text": "Hello!"}
],
"model": "claude-sonnet-4-6",
"stop_reason": "end_turn",
"stop_sequence": null,
"usage": {
"input_tokens": 10,
"output_tokens": 5,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0
}
}
Stop reason mapping (OpenAI ↔ Anthropic)
OpenAI finish_reason |
Anthropic stop_reason |
|---|---|
"stop" |
"end_turn" |
"tool_calls" |
"tool_use" |
"length" |
"max_tokens" |
3. OpenAI Responses API (/v1/responses)
This is the strictest SSE format. Clients like Cherry Studio validate every field — missing sequence_number or item_id causes empty renders.
Request format
{
"model": "claude-sonnet-4-6",
"input": "Hello",
"stream": true,
"max_output_tokens": 4096,
"instructions": "You are helpful.",
"temperature": 0.7,
"tools": [...],
"tool_choice": "auto"
}
input can be:
- A string (simple prompt → converted to single user message)
- An array of message objects with
roleandcontent(content blocks useinput_text/output_texttypes)
Streaming SSE format — COMPLETE EVENT SEQUENCE
Every event has event: <type>\ndata: <JSON>\n\n. Every data payload must include sequence_number (incrementing integer starting at 0).
event: response.created
data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_abc","object":"response","created_at":1700000000,"status":"in_progress","model":"claude-sonnet-4-6","output":[],"usage":null,"error":null,"incomplete_details":null,"instructions":null,"metadata":{},"parallel_tool_calls":true,"temperature":1.0,"tool_choice":"auto","tools":[],"top_p":1.0,"max_output_tokens":null,"previous_response_id":null,"reasoning":{"effort":null,"summary":null},"store":true,"truncation":"disabled","user":null}}
event: response.in_progress
data: {"type":"response.in_progress","sequence_number":1,"response":{...same as above...}}
event: response.output_item.added
data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"type":"message","id":"msg_abc","status":"in_progress","role":"assistant","content":[]}}
event: response.content_part.added
data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_abc","output_index":0,"content_index":0,"part":{"type":"output_text","text":"","annotations":[]}}
event: response.output_text.delta
data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_abc","output_index":0,"content_index":0,"delta":"Hello"}
event: response.output_text.delta
data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_abc","output_index":0,"content_index":0,"delta":" there!"}
event: response.output_text.done
data: {"type":"response.output_text.done","sequence_number":6,"item_id":"msg_abc","output_index":0,"content_index":0,"text":"Hello there!"}
event: response.content_part.done
data: {"type":"response.content_part.done","sequence_number":7,"item_id":"msg_abc","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello there!","annotations":[]}}
event: response.output_item.done
data: {"type":"response.output_item.done","sequence_number":8,"output_index":0,"item":{"type":"message","id":"msg_abc","status":"completed","role":"assistant","content":[{"type":"output_text","text":"Hello there!","annotations":[]}]}}
event: response.completed
data: {"type":"response.completed","sequence_number":9,"response":{"id":"resp_abc","object":"response","created_at":1700000000,"status":"completed","model":"claude-sonnet-4-6","output":[...],"usage":{"input_tokens":10,"output_tokens":5,"total_tokens":15},"error":null,...all other fields...}}
Mandatory fields clients validate
These are the fields that cause empty responses in strict clients if missing:
| Field | Where | Why |
|---|---|---|
sequence_number |
Every SSE event | Clients use this for ordering and deduplication |
item_id |
content_part.added, output_text.delta, output_text.done, content_part.done |
Associates content with its parent message item |
annotations |
Inside part objects (output_text) |
Array (can be empty []) — missing = parse failure |
response.in_progress event |
After response.created |
Some clients wait for this before processing content |
| Full response metadata | In response.created and response.completed |
Fields like metadata, tools, reasoning, temperature, etc. |
Tool calls (function_call) in streaming
Tool calls in Responses API are separate function_call output items (not embedded in a message):
event: response.output_item.added
data: {"type":"response.output_item.added","sequence_number":N,"output_index":1,"item":{"type":"function_call","id":"fc_abc","call_id":"call_abc","name":"get_weather","arguments":"","status":"in_progress"}}
event: response.function_call_arguments.delta
data: {"type":"response.function_call_arguments.delta","sequence_number":N+1,"item_id":"fc_abc","output_index":1,"delta":"{\"location\":"}
event: response.function_call_arguments.done
data: {"type":"response.function_call_arguments.done","sequence_number":N+2,"item_id":"fc_abc","output_index":1,"arguments":"{\"location\": \"SF\"}"}
event: response.output_item.done
data: {"type":"response.output_item.done","sequence_number":N+3,"output_index":1,"item":{"type":"function_call","id":"fc_abc","call_id":"call_abc","name":"get_weather","arguments":"{\"location\": \"SF\"}","status":"completed"}}
Non-streaming response
{
"id": "resp_abc",
"object": "response",
"created_at": 1700000000,
"status": "completed",
"model": "claude-sonnet-4-6",
"output": [
{
"type": "message",
"id": "msg_abc",
"status": "completed",
"role": "assistant",
"content": [{"type": "output_text", "text": "Hello!", "annotations": []}]
}
],
"usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}
}
Format Conversion Rules
When your upstream speaks one format and your client speaks another, you need bidirectional conversion.
Anthropic → OpenAI (request translation)
| Anthropic | OpenAI |
|---|---|
system (string or block array) |
messages[0] with role: "system" |
messages[].content (block array) |
Flatten: text blocks → {type:"text", text:...}, image blocks → {type:"image_url", image_url:{url:...}} |
tool_use content blocks |
assistant message's tool_calls array: {id, type:"function", function:{name, arguments: JSON.stringify(input)}} |
tool_result content blocks |
Separate {role:"tool", tool_call_id, content} message |
thinking / redacted_thinking blocks |
Skip (upstream typically doesn't support) |
tools[].input_schema |
tools[].function.parameters |
tool_choice: {type:"auto"} |
tool_choice: "auto" |
tool_choice: {type:"any"} |
tool_choice: "required" |
tool_choice: {type:"tool", name:"X"} |
tool_choice: {type:"function", function:{name:"X"}} |
tool_choice: {type:"none"} |
tool_choice: "none" |
max_tokens |
max_tokens (pass through) |
OpenAI → Anthropic (response translation)
| OpenAI Response | Anthropic Response |
|---|---|
choices[0].message.content |
content: [{type:"text", text:...}] |
choices[0].message.tool_calls |
content: [{type:"tool_use", id, name, input: JSON.parse(arguments)}] |
finish_reason: "stop" |
stop_reason: "end_turn" |
finish_reason: "tool_calls" |
stop_reason: "tool_use" |
finish_reason: "length" |
stop_reason: "max_tokens" |
usage.prompt_tokens |
usage.input_tokens |
usage.completion_tokens |
usage.output_tokens |
Responses API → Chat Completions (request translation)
| Responses API | Chat Completions |
|---|---|
input (string) |
messages: [{role:"user", content: input}] |
input (array of message objects) |
Convert each: input_text/output_text blocks → plain text |
instructions |
messages[0] with role: "system" |
max_output_tokens |
max_tokens |
tools / tool_choice |
Pass through (same format) |
Chat Completions → Responses API (response translation)
| Chat Completions Response | Responses API Response |
|---|---|
choices[0].message.content |
output: [{type:"message", content:[{type:"output_text", text:..., annotations:[]}]}] |
choices[0].message.tool_calls |
output: [{type:"function_call", id, call_id, name, arguments, status:"completed"}] |
usage.prompt_tokens |
usage.input_tokens |
usage.completion_tokens |
usage.output_tokens |
Error Handling & Account Pool Retry
When proxying through a pool of accounts/credentials, errors need a 3-tier classification to decide between "return to client" vs "retry with next account" vs "disable account and retry."
3-Tier Error Classification
Upstream returns non-200
│
├─ Tier 1: Request Too Large (403 + "estimated cost")
│ → Return error to client immediately
│ → No account is at fault; the request exceeds any account's limit
│
├─ Tier 2: Insufficient Tokens (403 + "insufficient tokens"|"upgrade your plan"|"limit reached")
│ → Retry with next account (don't disable current one)
│ → This account might recover; another might have enough
│
├─ Tier 3: Account Exhausted (429 or 402)
│ → Disable account, retry with next
│ → Account is definitively out of quota
│
├─ Auth Invalid (401)
│ → Disable account, retry with next
│ → Token expired or revoked
│
└─ Other errors
→ Return error to client
→ Unknown issue, don't mask it
Retry loop pattern (pseudocode)
MAX_RETRIES = 10
tried_ids = []
for attempt in range(MAX_RETRIES):
account = get_active_account(exclude=tried_ids)
if not account:
return 503 "No active accounts available"
tried_ids.append(account.id)
mark_account_used(account.id)
response = call_upstream(account.api_key, request_payload)
if response.status == 200:
stream_or_return(response)
return
error_body = response.body
if is_request_too_large(response.status, error_body):
return error_to_client(response.status, error_body) # No retry
if is_insufficient_tokens(response.status, error_body):
continue # Try next account, don't disable
if is_token_exhausted(response.status, error_body):
disable_account(account.id)
continue # Try next account
if response.status == 401:
disable_account(account.id)
continue # Try next account
# Unknown error — return to client
return error_to_client(response.status, error_body)
return 503 "All accounts exhausted"
This pattern must be applied identically across all 6 code paths (3 interfaces x streaming/non-streaming).
Proxy failure handling
If the upstream request fails at the network level (connection refused, timeout):
- Mark the proxy as failed (if using proxy rotation)
- Retry with next account (which may get a different proxy)
- Don't disable the account — it wasn't the account's fault
Anthropic Compatibility Edge Cases
These are real-world issues that cause 400 errors or broken streams if not handled.
Orphaned tool_use blocks
When clients truncate conversation history, an assistant message may contain tool_use blocks without a corresponding tool_result in the next user message. Anthropic's API rejects this. Fix by scanning messages and inserting synthetic tool_result blocks:
{
"type": "tool_result",
"tool_use_id": "<orphaned_id>",
"content": "[Tool result unavailable - conversation history was truncated]",
"is_error": true
}
cache_control stripping
If the upstream doesn't support Anthropic's prompt caching, strip cache_control fields from:
systemblocksmessages[].contentblockstoolsarray items
thinking field stripping
If the upstream doesn't support extended thinking, delete the thinking field from the request before forwarding.
Image format conversion
Anthropic uses {type:"image", source:{type:"base64", media_type:"image/png", data:"..."}}.
OpenAI uses {type:"image_url", image_url:{url:"data:image/png;base64,..."}}.
Convert between them when translating.
Model Name Mapping
Support both short aliases and provider-prefixed names. Map legacy names to current ones:
# Alias → Provider-prefixed
claude-sonnet-4-6 → anthropic/claude-sonnet-4-6
claude-opus-4-6 → anthropic/claude-opus-4-6
gpt-5-mini → openai/gpt-5-mini
# Legacy → Current
claude-3-5-sonnet-20241022 → anthropic/claude-sonnet-4-5
claude-3-opus-20240229 → anthropic/claude-opus-4-6
# Auto-prefix by pattern
claude-* → anthropic/
gpt-* → openai/
gemini-* → google/
grok-* → xai/
The /v1/models endpoint should return both upstream models and local aliases (deduplicated). Cache with a TTL (e.g., 5 minutes).
Implementation Checklist
Use this when building a new adapter or auditing an existing one.
Per-endpoint checklist
- [ ] Auth validation — check Bearer token before processing
- [ ] Request parsing — handle both
stream: trueandstream: false - [ ] Model mapping — translate aliases to upstream model IDs
- [ ] Format translation — convert request to upstream format
- [ ] Error classification — implement all 3 tiers + 401 handling
- [ ] Retry loop — up to N accounts, track tried IDs
- [ ] SSE headers — Content-Type, Cache-Control, Connection, X-Accel-Buffering
- [ ] Flush after each event — critical for real-time streaming
- [ ] Usage tracking — capture token counts from upstream, include in final event
Per-format checklist
OpenAI Chat Completions:
- [ ] Passthrough streaming (if upstream is OpenAI-compatible)
- [ ]
data: [DONE]terminator - [ ] Consistent
idacross chunks
Anthropic Messages:
- [ ]
message_start→ping→ content blocks →message_delta→message_stop - [ ] Block index tracking (increments per block)
- [ ] Thinking blocks with
signature_deltabefore close - [ ] Tool use:
content_block_startwithtool_usetype →input_json_deltachunks - [ ]
cache_creation_input_tokensandcache_read_input_tokensin usage (even if 0)
OpenAI Responses:
- [ ]
sequence_numberon every event (incrementing from 0) - [ ]
item_idon content_part and text delta/done events - [ ]
response.in_progressevent afterresponse.created - [ ]
annotations: []in alloutput_textparts - [ ] Full response metadata in
response.createdandresponse.completed - [ ]
function_callas separate output items (not embedded in message)
Debugging Empty Responses
When a client shows empty output but curl works, check these in order:
- Missing
sequence_number(Responses API) — most common cause for Cherry Studio - Missing
item_idon delta events (Responses API) — client can't associate text with item - Missing
response.in_progressevent — some clients wait for it - Missing
annotations: []in output_text parts — causes parse failures - Missing
pingevent (Anthropic) — Claude Code may stall - No flush after each SSE event — data sits in buffer, never reaches client
- Nginx buffering — missing
X-Accel-Buffering: noheader Content-Typewrong — must betext/event-stream, notapplication/json- Double newline missing — SSE events must end with
\n\n, not just\n
For more details on SSE wire format requirements, see the SSE Wire Format section below. For format conversion code patterns, see the Conversion Patterns section below.
二、参考资料(references/)
Conversion Patterns
Language-agnostic pseudocode for converting between all three LLM API formats. Real-world patterns extracted from a production proxy handling 470+ accounts.
Table of Contents
- Anthropic Request → OpenAI Request
- OpenAI Response → Anthropic Response
- OpenAI Stream → Anthropic Stream
- Responses Request → Chat Completions Request
- Chat Completions Stream → Responses Stream
- Chat Completions Response → Responses Response
- Tool Format Conversion
Anthropic to OpenAI Request
function translateAnthropicToOpenAI(anthropicPayload):
openai = {
model: anthropicPayload.model,
messages: [],
max_tokens: anthropicPayload.max_tokens ?? 4096,
stream: false
}
// Pass through optional params
if anthropicPayload.temperature: openai.temperature = anthropicPayload.temperature
if anthropicPayload.top_p: openai.top_p = anthropicPayload.top_p
// System message
if anthropicPayload.system:
if typeof system == string:
openai.messages.push({role: "system", content: system})
if typeof system == array:
text = system.filter(b => b.type == "text").map(b => b.text).join("")
openai.messages.push({role: "system", content: text})
// Messages
for msg in anthropicPayload.messages:
if typeof msg.content == string:
openai.messages.push({role: msg.role, content: msg.content})
continue
// Array content — process each block
openContent = []
toolCalls = []
for block in msg.content:
switch block.type:
case "text":
openContent.push({type: "text", text: block.text})
case "thinking", "redacted_thinking":
skip // Upstream doesn't support thinking
case "image":
if block.source.type == "base64":
url = "data:{block.source.media_type};base64,{block.source.data}"
openContent.push({type: "image_url", image_url: {url}})
else if block.source.type == "url":
openContent.push({type: "image_url", image_url: {url: block.source.url}})
case "tool_use":
toolCalls.push({
id: block.id,
type: "function",
function: {name: block.name, arguments: JSON.stringify(block.input)}
})
case "tool_result":
// Tool results become separate messages
resultContent = ""
if typeof block.content == string:
resultContent = block.content
else if typeof block.content == array:
resultContent = block.content.filter(b => b.text).map(b => b.text).join("")
openai.messages.push({
role: "tool",
tool_call_id: block.tool_use_id,
content: resultContent
})
// Build the message
if openContent.length > 0 or toolCalls.length > 0:
msgBody = {role: msg.role}
if openContent.length > 0:
// Simplify: single text block with no images → plain string
if openContent.length == 1 and openContent[0].type == "text":
msgBody.content = openContent[0].text
else:
msgBody.content = openContent
if toolCalls.length > 0:
msgBody.tool_calls = toolCalls
openai.messages.push(msgBody)
// Tools
if anthropicPayload.tools:
openai.tools = anthropicPayload.tools.map(t => ({
type: "function",
function: {
name: t.name,
description: t.description,
parameters: t.input_schema
}
}))
// Tool choice mapping
if anthropicPayload.tool_choice:
switch anthropicPayload.tool_choice.type:
case "auto": openai.tool_choice = "auto"
case "any": openai.tool_choice = "required"
case "none": openai.tool_choice = "none"
case "tool":
openai.tool_choice = {
type: "function",
function: {name: anthropicPayload.tool_choice.name}
}
return openai
OpenAI to Anthropic Response
function openaiSyncToAnthropicJSON(openaiResp, origModel, thinkingEnabled):
msgID = "msg_" + randomHex(12)
choice = openaiResp.choices[0] ?? {}
message = choice.message ?? {}
content = []
// Thinking block (if upstream provided reasoning_content)
if thinkingEnabled and message.reasoning_content:
content.push({type: "thinking", thinking: message.reasoning_content, signature: ""})
// Text content
if message.content:
content.push({type: "text", text: message.content})
// Tool calls
if message.tool_calls:
for tc in message.tool_calls:
input = JSON.parse(tc.function.arguments) ?? {}
content.push({
type: "tool_use",
id: tc.id,
name: tc.function.name,
input: input
})
// Stop reason mapping
stopReason = "end_turn"
switch choice.finish_reason:
case "tool_calls": stopReason = "tool_use"
case "length": stopReason = "max_tokens"
case "stop": stopReason = "end_turn"
return {
id: msgID,
type: "message",
role: "assistant",
content: content,
model: origModel,
stop_reason: stopReason,
stop_sequence: null,
usage: {
input_tokens: openaiResp.usage?.prompt_tokens ?? 0,
output_tokens: openaiResp.usage?.completion_tokens ?? 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0
}
}
OpenAI Stream to Anthropic Stream
This is the most complex conversion — translating OpenAI streaming chunks into Anthropic SSE events in real-time.
function streamOpenAIToAnthropic(upstreamSSE, writer, origModel, thinkingEnabled):
msgID = "msg_" + randomHex(12)
// State tracking
blockIndex = 0
textBlockStarted = false
reasoningBlockStarted = false
toolCallBlockStarted = {} // openai_tc_index → bool
toolCallToBlock = {} // openai_tc_index → anthropic_block_index
finishReason = ""
// Send initial events
emit("message_start", {
type: "message_start",
message: {
id: msgID, type: "message", role: "assistant",
content: [], model: origModel,
stop_reason: null, stop_sequence: null,
usage: {input_tokens: 0, output_tokens: 0,
cache_creation_input_tokens: 0, cache_read_input_tokens: 0}
}
})
emit("ping", {type: "ping"})
// Process each upstream chunk
for chunk in upstreamSSE:
delta = chunk.choices[0].delta
// Capture finish_reason
if chunk.choices[0].finish_reason:
finishReason = chunk.choices[0].finish_reason
// Handle reasoning_content (thinking)
if delta.reasoning_content and thinkingEnabled:
if not reasoningBlockStarted:
emit("content_block_start", {
type: "content_block_start", index: blockIndex,
content_block: {type: "thinking", thinking: "", signature: ""}
})
reasoningBlockStarted = true
emit("content_block_delta", {
type: "content_block_delta", index: blockIndex,
delta: {type: "thinking_delta", thinking: delta.reasoning_content}
})
// Handle text content
if delta.content:
// Close reasoning block first
if reasoningBlockStarted and not textBlockStarted:
emit("content_block_delta", {
type: "content_block_delta", index: blockIndex,
delta: {type: "signature_delta", signature: ""}
})
emit("content_block_stop", {type: "content_block_stop", index: blockIndex})
blockIndex++
if not textBlockStarted:
emit("content_block_start", {
type: "content_block_start", index: blockIndex,
content_block: {type: "text", text: ""}
})
textBlockStarted = true
emit("content_block_delta", {
type: "content_block_delta", index: blockIndex,
delta: {type: "text_delta", text: delta.content}
})
// Handle tool calls
if delta.tool_calls:
for tc in delta.tool_calls:
tcIdx = tc.index
if not toolCallBlockStarted[tcIdx]:
// Close text block before first tool call
if textBlockStarted and toolCallBlockStarted.isEmpty():
emit("content_block_stop", {index: blockIndex})
blockIndex++
toolCallToBlock[tcIdx] = blockIndex
emit("content_block_start", {
type: "content_block_start", index: blockIndex,
content_block: {
type: "tool_use", id: tc.id,
name: tc.function.name, input: {}
}
})
toolCallBlockStarted[tcIdx] = true
if tc.function.arguments:
emit("content_block_delta", {
index: blockIndex,
delta: {type: "input_json_delta", partial_json: tc.function.arguments}
})
else:
bi = toolCallToBlock[tcIdx]
if tc.function.arguments:
emit("content_block_delta", {
index: bi,
delta: {type: "input_json_delta", partial_json: tc.function.arguments}
})
// Close all open blocks
if reasoningBlockStarted and not textBlockStarted and toolCallBlockStarted.isEmpty():
emit("content_block_delta", {index: blockIndex, delta: {type: "signature_delta", signature: ""}})
emit("content_block_stop", {index: blockIndex})
if textBlockStarted and toolCallBlockStarted.isEmpty():
emit("content_block_stop", {index: blockIndex})
for tcIdx in toolCallBlockStarted:
emit("content_block_stop", {index: toolCallToBlock[tcIdx]})
// Stop reason mapping
stopReason = "end_turn"
switch finishReason:
case "tool_calls": stopReason = "tool_use"
case "length": stopReason = "max_tokens"
emit("message_delta", {
type: "message_delta",
delta: {stop_reason: stopReason},
usage: {output_tokens: 0}
})
emit("message_stop", {type: "message_stop"})
Responses to Chat Completions Request
function translateResponsesInput(responsesPayload):
chatPayload = {
model: responsesPayload.model,
stream: false,
max_tokens: responsesPayload.max_output_tokens ?? 4096
}
if responsesPayload.temperature: chatPayload.temperature = responsesPayload.temperature
if responsesPayload.top_p: chatPayload.top_p = responsesPayload.top_p
messages = []
// Instructions → system message
if responsesPayload.instructions:
messages.push({role: "system", content: responsesPayload.instructions})
// Input handling
if typeof responsesPayload.input == string:
messages.push({role: "user", content: responsesPayload.input})
else if typeof responsesPayload.input == array:
for item in responsesPayload.input:
role = item.role
if typeof item.content == string:
messages.push({role, content: item.content})
else if typeof item.content == array:
// Flatten input_text/output_text blocks to plain text
textParts = []
for block in item.content:
if block.type in ["input_text", "output_text", "text"]:
textParts.push(block.text)
if textParts.length > 0:
messages.push({role, content: textParts.join("\n")})
chatPayload.messages = messages
// Tools passthrough
if responsesPayload.tools: chatPayload.tools = responsesPayload.tools
if responsesPayload.tool_choice: chatPayload.tool_choice = responsesPayload.tool_choice
return chatPayload
Chat Completions Stream to Responses Stream
function streamChatToResponses(upstreamSSE, writer, origModel):
respID = "resp_" + randomHex(12)
msgID = "msg_" + randomHex(12)
seqNum = 0
created = now()
// Helper: emit with auto-incrementing sequence_number
function emit(eventType, data):
data.sequence_number = seqNum++
writeSSE(writer, eventType, data)
flush()
// Build the "in progress" response object with all required fields
inProgressResp = {
id: respID, object: "response", created_at: created,
status: "in_progress", model: origModel,
output: [], usage: null,
error: null, incomplete_details: null, instructions: null,
metadata: {}, parallel_tool_calls: true,
temperature: 1.0, tool_choice: "auto", tools: [],
top_p: 1.0, max_output_tokens: null, previous_response_id: null,
reasoning: {effort: null, summary: null},
store: true, truncation: "disabled", user: null
}
emit("response.created", {type: "response.created", response: inProgressResp})
emit("response.in_progress", {type: "response.in_progress", response: inProgressResp})
emit("response.output_item.added", {
type: "response.output_item.added", output_index: 0,
item: {type: "message", id: msgID, status: "in_progress",
role: "assistant", content: []}
})
emit("response.content_part.added", {
type: "response.content_part.added",
item_id: msgID, output_index: 0, content_index: 0,
part: {type: "output_text", text: "", annotations: []}
})
// Track state
fullText = ""
textBlockSent = false
toolCallNames = {} // index → name
toolCallIDs = {} // index → id
toolCallArgs = {} // index → accumulated arguments
upstreamUsage = {input_tokens: 0, output_tokens: 0, total_tokens: 0}
for chunk in upstreamSSE:
// Capture usage from final chunk
if chunk.usage:
upstreamUsage = {
input_tokens: chunk.usage.prompt_tokens,
output_tokens: chunk.usage.completion_tokens,
total_tokens: chunk.usage.total_tokens
}
delta = chunk.choices[0]?.delta
if not delta: continue
if delta.content:
fullText += delta.content
textBlockSent = true
emit("response.output_text.delta", {
type: "response.output_text.delta",
item_id: msgID, output_index: 0, content_index: 0,
delta: delta.content
})
// Accumulate tool calls
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if tc.id: toolCallIDs[idx] = tc.id
if tc.function?.name: toolCallNames[idx] = tc.function.name
if tc.function?.arguments: toolCallArgs[idx] += tc.function.arguments
// Close text block
allOutput = []
outputIndex = 0
if textBlockSent or fullText:
emit("response.output_text.done", {
type: "response.output_text.done",
item_id: msgID, output_index: 0, content_index: 0,
text: fullText
})
emit("response.content_part.done", {
type: "response.content_part.done",
item_id: msgID, output_index: 0, content_index: 0,
part: {type: "output_text", text: fullText, annotations: []}
})
msgItem = {
type: "message", id: msgID, status: "completed",
role: "assistant",
content: [{type: "output_text", text: fullText, annotations: []}]
}
emit("response.output_item.done", {
type: "response.output_item.done", output_index: outputIndex, item: msgItem
})
allOutput.push(msgItem)
outputIndex++
// Emit function_call items
for i in sorted(toolCallNames.keys()):
fcID = "fc_" + randomHex(12)
fcItem = {
type: "function_call", id: fcID,
call_id: toolCallIDs[i], name: toolCallNames[i],
arguments: toolCallArgs[i], status: "completed"
}
emit("response.output_item.added", {
type: "response.output_item.added", output_index: outputIndex, item: fcItem
})
emit("response.function_call_arguments.done", {
type: "response.function_call_arguments.done",
item_id: fcID, output_index: outputIndex,
arguments: toolCallArgs[i]
})
emit("response.output_item.done", {
type: "response.output_item.done", output_index: outputIndex, item: fcItem
})
allOutput.push(fcItem)
outputIndex++
// If nothing output, send empty message
if allOutput.isEmpty():
emptyItem = {type: "message", id: msgID, status: "completed",
role: "assistant",
content: [{type: "output_text", text: "", annotations: []}]}
emit("response.output_item.done", {output_index: 0, item: emptyItem})
allOutput.push(emptyItem)
// Final event
completedResp = copy(inProgressResp)
completedResp.status = "completed"
completedResp.output = allOutput
completedResp.usage = upstreamUsage
emit("response.completed", {type: "response.completed", response: completedResp})
Chat Completions to Responses Response
Non-streaming conversion:
function openaiChatToResponsesJSON(openaiResp, origModel):
respID = "resp_" + randomHex(12)
msgID = "msg_" + randomHex(12)
choice = openaiResp.choices[0]
message = choice?.message ?? {}
text = message.content ?? ""
output = []
// Text message
if text:
output.push({
type: "message", id: msgID, status: "completed",
role: "assistant",
content: [{type: "output_text", text: text, annotations: []}]
})
// Function calls
if message.tool_calls:
for tc in message.tool_calls:
output.push({
type: "function_call",
id: "fc_" + randomHex(12),
call_id: tc.id,
name: tc.function.name,
arguments: tc.function.arguments,
status: "completed"
})
// Empty fallback
if output.isEmpty():
output.push({
type: "message", id: msgID, status: "completed",
role: "assistant",
content: [{type: "output_text", text: "", annotations: []}]
})
return {
id: respID,
object: "response",
created_at: openaiResp.created ?? now(),
status: "completed",
model: origModel,
output: output,
usage: {
input_tokens: openaiResp.usage?.prompt_tokens ?? 0,
output_tokens: openaiResp.usage?.completion_tokens ?? 0,
total_tokens: openaiResp.usage?.total_tokens ?? 0
}
}
Tool Format Conversion
Anthropic tools → OpenAI tools
Anthropic:
{name: "get_weather", description: "Get weather", input_schema: {type: "object", properties: {...}}}
OpenAI:
{type: "function", function: {name: "get_weather", description: "Get weather", parameters: {type: "object", properties: {...}}}}
Mapping: input_schema → function.parameters
OpenAI tool_calls → Anthropic tool_use
OpenAI (in message):
{tool_calls: [{id: "call_abc", type: "function", function: {name: "get_weather", arguments: '{"location":"SF"}'}}]}
Anthropic (in content):
{type: "tool_use", id: "call_abc", name: "get_weather", input: {location: "SF"}}
Mapping: JSON.parse(arguments) → input
Anthropic tool_result → OpenAI tool message
Anthropic (in content):
{type: "tool_result", tool_use_id: "call_abc", content: "72°F and sunny"}
OpenAI (separate message):
{role: "tool", tool_call_id: "call_abc", content: "72°F and sunny"}
Anthropic tool_choice → OpenAI tool_choice
| Anthropic | OpenAI |
|---|---|
{type: "auto"} |
"auto" |
{type: "any"} |
"required" |
{type: "none"} |
"none" |
{type: "tool", name: "X"} |
{type: "function", function: {name: "X"}} |
SSE Wire Format
Detailed byte-level format requirements for Server-Sent Events across all three LLM API protocols.
Table of Contents
- General SSE Rules
- OpenAI Chat Completions SSE
- Anthropic Messages SSE
- OpenAI Responses API SSE
- Keepalive and Timeout
- Common Pitfalls
General SSE Rules
SSE is a W3C standard. Each event is:
[event: <type>\n]
data: <payload>\n
\n
Key rules:
- Lines end with
\n(LF), not\r\n - Each event terminates with a blank line (
\n\n) event:line is optional (OpenAI Chat omits it; Anthropic and Responses use it)data:can span multiple lines: each line starts withdata:- A comment line starts with
:— used for keepalive (: keepalive\n\n)
Flushing
After writing each SSE event to the response writer, you must flush the buffer. Without flushing, events accumulate in the server/framework's output buffer and the client sees nothing until the buffer fills or the stream ends.
In Go: flusher.Flush() via http.Flusher interface
In Python (FastAPI/Starlette): yield in StreamingResponse auto-flushes
In Node.js (Express): res.flush() or res.flushHeaders() + res.write()
In Rust (axum): use axum::response::Sse which auto-flushes
Headers
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no
X-Accel-Buffering: no is critical when behind nginx — without it, nginx buffers the entire response before forwarding.
OpenAI Chat Completions SSE
Format: data: <JSON>\n\n (no event: field)
Wire example (raw bytes)
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1700000000,"model":"gpt-5-mini","choices":[{"index":0,"delta":{"role":"assistant"},"logprobs":null,"finish_reason":null}]}\n
\n
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1700000000,"model":"gpt-5-mini","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}]}\n
\n
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1700000000,"model":"gpt-5-mini","choices":[{"index":0,"delta":{"content":" there!"},"logprobs":null,"finish_reason":null}]}\n
\n
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1700000000,"model":"gpt-5-mini","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":8,"completion_tokens":3,"total_tokens":11}}\n
\n
data: [DONE]\n
\n
Chunk fields
| Field | First chunk | Middle chunks | Final chunk |
|---|---|---|---|
id |
Present | Same value | Same value |
object |
"chat.completion.chunk" |
Same | Same |
created |
Unix timestamp | Same | Same |
model |
Model ID | Same | Same |
system_fingerprint |
Optional | Same | Same |
choices[0].index |
0 |
0 |
0 |
choices[0].delta.role |
"assistant" |
Absent | Absent |
choices[0].delta.content |
Absent | Text fragment | Absent |
choices[0].delta.tool_calls |
Absent | Tool call fragments | Absent |
choices[0].logprobs |
null |
null |
null |
choices[0].finish_reason |
null |
null |
"stop" / "tool_calls" / "length" |
usage |
Absent | Absent | Token counts (optional) |
Tool call streaming detail
Tool calls arrive incrementally across multiple chunks:
# Chunk 1: tool call header (id, name, empty arguments)
delta.tool_calls: [{"index":0,"id":"call_abc","type":"function","function":{"name":"get_weather","arguments":""}}]
# Chunk 2-N: argument fragments
delta.tool_calls: [{"index":0,"function":{"arguments":"{\"loc"}}]
delta.tool_calls: [{"index":0,"function":{"arguments":"ation\":"}}]
delta.tool_calls: [{"index":0,"function":{"arguments":" \"SF\"}"}}]
# Final chunk
delta: {}, finish_reason: "tool_calls"
Accumulate arguments fragments per tool call index. The id and name only appear once.
Anthropic Messages SSE
Format: event: <type>\ndata: <JSON>\n\n
Complete event sequence
event: message_start\n
data: {"type":"message_start","message":{...}}\n
\n
event: ping\n
data: {"type":"ping"}\n
\n
event: content_block_start\n
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}\n
\n
event: content_block_delta\n
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}\n
\n
event: content_block_stop\n
data: {"type":"content_block_stop","index":0}\n
\n
event: message_delta\n
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":15}}\n
\n
event: message_stop\n
data: {"type":"message_stop"}\n
\n
Event type reference
| Event | Payload | When |
|---|---|---|
message_start |
Full message object (empty content, usage with input_tokens) | First event |
ping |
{"type":"ping"} |
After message_start |
content_block_start |
Block type + index | Before each content block |
content_block_delta |
Delta for current block | During content generation |
content_block_stop |
Block index | After each content block |
message_delta |
stop_reason + output usage | After all blocks |
message_stop |
{"type":"message_stop"} |
Final event |
Content block types and their deltas
| Block type | Start payload | Delta type | Delta payload |
|---|---|---|---|
text |
{"type":"text","text":""} |
text_delta |
{"type":"text_delta","text":"..."} |
thinking |
{"type":"thinking","thinking":"","signature":""} |
thinking_delta |
{"type":"thinking_delta","thinking":"..."} |
thinking (close) |
— | signature_delta |
{"type":"signature_delta","signature":""} |
tool_use |
{"type":"tool_use","id":"...","name":"...","input":{}} |
input_json_delta |
{"type":"input_json_delta","partial_json":"..."} |
Block ordering rules
- Thinking blocks come first (if thinking enabled)
- Text blocks come after thinking
- Tool use blocks come after text
- Each block increments the index
- Close reasoning block (
signature_delta+content_block_stop) before starting text block
OpenAI Responses API SSE
Format: event: <type>\ndata: <JSON>\n\n
This is the strictest format. Every event must include sequence_number.
Complete event sequence for text response
event: response.created\n
data: {"type":"response.created","sequence_number":0,"response":{...full response object...}}\n
\n
event: response.in_progress\n
data: {"type":"response.in_progress","sequence_number":1,"response":{...same...}}\n
\n
event: response.output_item.added\n
data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"type":"message","id":"msg_abc","status":"in_progress","role":"assistant","content":[]}}\n
\n
event: response.content_part.added\n
data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_abc","output_index":0,"content_index":0,"part":{"type":"output_text","text":"","annotations":[]}}\n
\n
event: response.output_text.delta\n
data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_abc","output_index":0,"content_index":0,"delta":"Hello"}\n
\n
event: response.output_text.done\n
data: {"type":"response.output_text.done","sequence_number":N,"item_id":"msg_abc","output_index":0,"content_index":0,"text":"Hello there!"}\n
\n
event: response.content_part.done\n
data: {"type":"response.content_part.done","sequence_number":N+1,"item_id":"msg_abc","output_index":0,"content_index":0,"part":{"type":"output_text","text":"Hello there!","annotations":[]}}\n
\n
event: response.output_item.done\n
data: {"type":"response.output_item.done","sequence_number":N+2,"output_index":0,"item":{...completed item...}}\n
\n
event: response.completed\n
data: {"type":"response.completed","sequence_number":N+3,"response":{...completed response...}}\n
\n
Response object — required fields
The response object in response.created and response.completed must include all these fields (use null for absent values):
{
"id": "resp_abc",
"object": "response",
"created_at": 1700000000,
"status": "in_progress",
"model": "claude-sonnet-4-6",
"output": [],
"usage": null,
"error": null,
"incomplete_details": null,
"instructions": null,
"metadata": {},
"parallel_tool_calls": true,
"temperature": 1.0,
"tool_choice": "auto",
"tools": [],
"top_p": 1.0,
"max_output_tokens": null,
"previous_response_id": null,
"reasoning": {"effort": null, "summary": null},
"store": true,
"truncation": "disabled",
"user": null
}
Event field matrix
| Event | sequence_number |
item_id |
output_index |
content_index |
|---|---|---|---|---|
response.created |
Yes | No | No | No |
response.in_progress |
Yes | No | No | No |
response.output_item.added |
Yes | No | Yes | No |
response.content_part.added |
Yes | Yes | Yes | Yes |
response.output_text.delta |
Yes | Yes | Yes | Yes |
response.output_text.done |
Yes | Yes | Yes | Yes |
response.content_part.done |
Yes | Yes | Yes | Yes |
response.output_item.done |
Yes | No | Yes | No |
response.function_call_arguments.delta |
Yes | Yes | Yes | No |
response.function_call_arguments.done |
Yes | Yes | Yes | No |
response.completed |
Yes | No | No | No |
Keepalive and Timeout
For long-running requests (e.g., complex reasoning), the stream may go silent for 30+ seconds. Without keepalive, reverse proxies (nginx, Cloudflare) may close the connection.
SSE comment keepalive
Send a comment every 5-15 seconds during silent periods:
: keepalive\n
\n
This is a valid SSE comment (starts with :) — clients ignore it, but it keeps the TCP connection alive.
Implementation pattern
start keepalive timer (every 5s):
write ": keepalive\n\n"
flush
on each real SSE event:
reset keepalive timer
on stream end:
stop keepalive timer
Client disconnect handling
When the client disconnects mid-stream:
- Detect via write error or context cancellation
- Stop the keepalive timer
- Close the upstream connection (don't waste upstream tokens)
- Clean up resources
Common Pitfalls
1. Missing double newline
Each SSE event ends with \n\n. A single \n means the event is not yet complete — the client will buffer and wait for more data.
2. No flush after write
Writing to the response writer doesn't mean the data reaches the client. You must flush after each event.
3. Writing headers after first write
HTTP headers must be written before the first body byte. If you call http.Error() after streaming has started, it becomes part of the body and corrupts the SSE stream.
4. Buffer size too small
Scanner/reader buffers must be large enough for the largest possible SSE line. A single chunk with a large tool call argument can exceed 64KB. Use at least 1MB buffer.
5. JSON in SSE data field
The JSON must be on a single line (no pretty-printing). Newlines in JSON would be interpreted as new SSE fields.
6. Forgetting data: [DONE] (OpenAI Chat)
Some clients (like OpenAI's own SDK) wait for [DONE] to finalize the response. Without it, the client may hang or timeout.
7. Inconsistent id across chunks (OpenAI Chat)
All chunks in one response must share the same id. Generating a new ID per chunk breaks clients that group chunks by ID.
8. Missing event: field (Anthropic/Responses)
Unlike OpenAI Chat (which only uses data:), Anthropic and Responses formats require the event: field. Without it, clients can't dispatch events by type.