How to Stream AI API Responses in Python and Node.js (2026 Guide)
Nobody wants to stare at a blank screen for 8 seconds waiting for Claude Opus to finish thinking. Streaming fixes that. Instead of waiting for the entire response to generate, you get tokens as they're produced — word by word, in real time.
This isn't just a UX nicety. Streaming changes how you build AI-powered apps. You can cancel bad responses early (saving tokens and money), show progress indicators, and build chat interfaces that feel responsive even when the model is generating a 2,000-word answer.
This guide covers everything: the protocol under the hood, production-ready code for Python and Node.js, error handling patterns, and how to pipe streams to a frontend. All examples use the OpenAI-compatible API format, so they work with GPT-5, Claude, DeepSeek, Qwen — any model behind an OpenAI-compatible endpoint.
How Streaming Actually Works
When you set stream: true in your API request, the server switches from a normal HTTP response to Server-Sent Events (SSE). Instead of one big JSON blob at the end, you get a series of small chunks, each containing one or a few tokens.
Each chunk looks like this:
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
Three things to notice. First, each chunk has a delta instead of a message — it's the incremental piece, not the full text. Second, finish_reason stays null until the model is done. Third, the stream ends with data: [DONE].
The first token usually arrives in 200-500ms (depending on the model and provider), compared to 3-15 seconds for a non-streamed response. That's the difference between "fast" and "is this thing broken?"
Python: Streaming with the OpenAI SDK
The OpenAI Python SDK handles SSE parsing for you. Here's the minimal version:
from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://api.kissapi.ai/v1" # works with any OpenAI-compatible endpoint
)
stream = client.chat.completions.create(
model="claude-sonnet-4-6",
messages=[{"role": "user", "content": "Write a Python function to merge two sorted lists"}],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
print() # newline at the end
That flush=True matters. Without it, Python buffers the output and you lose the real-time effect in the terminal.
Collecting the Full Response
Usually you want to stream to the user AND keep the full text for logging or further processing:
full_response = []
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
print(content, end="", flush=True)
complete_text = "".join(full_response)
# Now you have the full response for logging, DB storage, etc.
Async Streaming
If you're building a web server with FastAPI or similar, you'll want the async version:
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.kissapi.ai/v1"
)
async def stream_response(prompt: str):
stream = await client.chat.completions.create(
model="claude-sonnet-4-6",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
yield content
This is a generator you can plug directly into a FastAPI StreamingResponse:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/chat")
async def chat(q: str):
return StreamingResponse(
stream_response(q),
media_type="text/plain"
)
Node.js: Streaming with the OpenAI SDK
Same idea in JavaScript. The OpenAI Node SDK returns an async iterable:
import OpenAI from "openai";
const client = new OpenAI({
apiKey: "your-api-key",
baseURL: "https://api.kissapi.ai/v1",
});
const stream = await client.chat.completions.create({
model: "claude-sonnet-4-6",
messages: [{ role: "user", content: "Explain event loops in Node.js" }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}
console.log(); // newline
Express + SSE to the Browser
Here's how to pipe a streaming AI response to a browser using Server-Sent Events:
import express from "express";
import OpenAI from "openai";
const app = express();
const client = new OpenAI({
apiKey: process.env.API_KEY,
baseURL: "https://api.kissapi.ai/v1",
});
app.get("/api/chat", async (req, res) => {
const { prompt } = req.query;
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const stream = await client.chat.completions.create({
model: "claude-sonnet-4-6",
messages: [{ role: "user", content: prompt }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
res.write(`data: ${JSON.stringify({ text: content })}\n\n`);
}
}
res.write("data: [DONE]\n\n");
res.end();
});
app.listen(3000);
On the frontend, consume it with EventSource or a fetch-based reader:
const response = await fetch(`/api/chat?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// Parse SSE lines and update your UI
const lines = text.split("\n").filter(l => l.startsWith("data: "));
for (const line of lines) {
const data = line.slice(6);
if (data === "[DONE]") break;
const { text: token } = JSON.parse(data);
document.getElementById("output").textContent += token;
}
}
Error Handling That Won't Bite You
Streams fail differently than regular requests. The connection can drop mid-response, the server can send malformed chunks, or you might hit rate limits after the stream has started. Here's a pattern that handles all of these:
import time
from openai import OpenAI, APIError, APIConnectionError, RateLimitError
client = OpenAI(api_key="your-key", base_url="https://api.kissapi.ai/v1")
def stream_with_retry(messages, model="claude-sonnet-4-6", max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
full_text = []
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_text.append(content)
yield content
return # success, exit retry loop
except RateLimitError:
wait = 2 ** attempt
print(f"\nRate limited. Waiting {wait}s...")
time.sleep(wait)
except APIConnectionError:
if attempt < max_retries - 1:
print(f"\nConnection dropped. Retrying...")
time.sleep(1)
else:
raise
except APIError as e:
print(f"\nAPI error: {e}")
raise
The key insight: wrap the entire stream consumption in the try block, not just the initial request. Connection drops happen mid-stream, not at the start.
Streaming vs. Non-Streaming: When to Use Which
| Use Streaming When | Skip Streaming When |
|---|---|
| Building chat interfaces | Batch processing (no user waiting) |
| Long responses (>500 tokens) | Short responses (classification, yes/no) |
| You want early cancellation | You need the full response before processing |
| Real-time UX matters | JSON mode (some providers don't stream JSON well) |
| Showing "typing" indicators | Function calling with complex tool use |
One gotcha: streaming responses don't include usage data (token counts) in every provider's implementation. If you need exact token counts for billing, you might need to count them yourself or make a separate call. Some providers like OpenAI now include usage in the final chunk — check your provider's docs.
Performance Tips
A few things I've learned from running streaming in production:
- Set reasonable timeouts. The first chunk should arrive within 5 seconds for most models. If it doesn't, something's wrong. Set a timeout on the initial connection, not on the full stream.
- Buffer before rendering. Don't update the DOM on every single token. Batch 3-5 tokens and render them together. The user won't notice the difference, but your browser will thank you.
- Use
stream_optionsfor usage data. If your provider supports it, passstream_options={"include_usage": True}to get token counts in the final chunk. - Cancel early when you can. If the user navigates away or the response is clearly wrong, abort the stream. Every token you don't generate is money saved.
# Python: cancel a stream early
stream = client.chat.completions.create(
model="claude-sonnet-4-6",
messages=messages,
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
if "ERROR" in content or should_cancel():
stream.close() # stop generating, stop paying
break
print(content, end="", flush=True)
Streaming with Extended Thinking
Claude's extended thinking mode works with streaming too. The model first streams its reasoning (thinking tokens), then streams the final answer. You can show or hide the thinking phase in your UI:
# Extended thinking tokens come as a separate content block
# The SDK handles this — thinking chunks have a different type
stream = client.chat.completions.create(
model="claude-sonnet-4-6",
messages=messages,
stream=True,
extra_body={"thinking": {"type": "enabled", "budget_tokens": 5000}}
)
for chunk in stream:
# Regular content
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
Thinking tokens are billed at a lower rate than output tokens on most providers, so extended thinking with streaming is actually a cost-effective way to get better answers on hard problems.
Stream Any AI Model Through One API
KissAPI gives you OpenAI-compatible streaming for Claude, GPT-5, DeepSeek, Qwen, and 50+ models. One endpoint, one API key, full streaming support.
Start Free →Quick Reference: Streaming Checklist
- Set
stream: truein your request - Read
chunk.choices[0].delta.content(not.message.content) - Handle
nullcontent chunks (they happen between content blocks) - Watch for
finish_reason: "stop"or[DONE]to know when it's over - Wrap the full stream loop in error handling, not just the initial call
- Set a timeout on first-chunk arrival (5s is reasonable)
- Collect chunks into a list if you need the full text later
- Close the stream early if the response goes sideways
Streaming is one of those features that's simple in concept but has enough edge cases to trip you up in production. Get the basics right, handle errors properly, and your users will never have to stare at a loading spinner again.