How to Stream AI API Responses in Python and Node.js (2026 Guide)

Nobody wants to stare at a blank screen for 8 seconds waiting for Claude Opus to finish thinking. Streaming fixes that. Instead of waiting for the entire response to generate, you get tokens as they're produced — word by word, in real time.

This isn't just a UX nicety. Streaming changes how you build AI-powered apps. You can cancel bad responses early (saving tokens and money), show progress indicators, and build chat interfaces that feel responsive even when the model is generating a 2,000-word answer.

This guide covers everything: the protocol under the hood, production-ready code for Python and Node.js, error handling patterns, and how to pipe streams to a frontend. All examples use the OpenAI-compatible API format, so they work with GPT-5, Claude, DeepSeek, Qwen — any model behind an OpenAI-compatible endpoint.

How Streaming Actually Works

When you set stream: true in your API request, the server switches from a normal HTTP response to Server-Sent Events (SSE). Instead of one big JSON blob at the end, you get a series of small chunks, each containing one or a few tokens.

Each chunk looks like this:

data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Three things to notice. First, each chunk has a delta instead of a message — it's the incremental piece, not the full text. Second, finish_reason stays null until the model is done. Third, the stream ends with data: [DONE].

The first token usually arrives in 200-500ms (depending on the model and provider), compared to 3-15 seconds for a non-streamed response. That's the difference between "fast" and "is this thing broken?"

Python: Streaming with the OpenAI SDK

The OpenAI Python SDK handles SSE parsing for you. Here's the minimal version:

from openai import OpenAI

client = OpenAI(
    api_key="your-api-key",
    base_url="https://api.kissapi.ai/v1"  # works with any OpenAI-compatible endpoint
)

stream = client.chat.completions.create(
    model="claude-sonnet-4-6",
    messages=[{"role": "user", "content": "Write a Python function to merge two sorted lists"}],
    stream=True
)

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)

print()  # newline at the end

That flush=True matters. Without it, Python buffers the output and you lose the real-time effect in the terminal.

Collecting the Full Response

Usually you want to stream to the user AND keep the full text for logging or further processing:

full_response = []

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        full_response.append(content)
        print(content, end="", flush=True)

complete_text = "".join(full_response)
# Now you have the full response for logging, DB storage, etc.

Async Streaming

If you're building a web server with FastAPI or similar, you'll want the async version:

from openai import AsyncOpenAI

client = AsyncOpenAI(
    api_key="your-api-key",
    base_url="https://api.kissapi.ai/v1"
)

async def stream_response(prompt: str):
    stream = await client.chat.completions.create(
        model="claude-sonnet-4-6",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            yield content

This is a generator you can plug directly into a FastAPI StreamingResponse:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/chat")
async def chat(q: str):
    return StreamingResponse(
        stream_response(q),
        media_type="text/plain"
    )

Node.js: Streaming with the OpenAI SDK

Same idea in JavaScript. The OpenAI Node SDK returns an async iterable:

import OpenAI from "openai";

const client = new OpenAI({
  apiKey: "your-api-key",
  baseURL: "https://api.kissapi.ai/v1",
});

const stream = await client.chat.completions.create({
  model: "claude-sonnet-4-6",
  messages: [{ role: "user", content: "Explain event loops in Node.js" }],
  stream: true,
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) {
    process.stdout.write(content);
  }
}

console.log(); // newline

Express + SSE to the Browser

Here's how to pipe a streaming AI response to a browser using Server-Sent Events:

import express from "express";
import OpenAI from "openai";

const app = express();
const client = new OpenAI({
  apiKey: process.env.API_KEY,
  baseURL: "https://api.kissapi.ai/v1",
});

app.get("/api/chat", async (req, res) => {
  const { prompt } = req.query;

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  const stream = await client.chat.completions.create({
    model: "claude-sonnet-4-6",
    messages: [{ role: "user", content: prompt }],
    stream: true,
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
      res.write(`data: ${JSON.stringify({ text: content })}\n\n`);
    }
  }

  res.write("data: [DONE]\n\n");
  res.end();
});

app.listen(3000);

On the frontend, consume it with EventSource or a fetch-based reader:

const response = await fetch(`/api/chat?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value);
  // Parse SSE lines and update your UI
  const lines = text.split("\n").filter(l => l.startsWith("data: "));
  for (const line of lines) {
    const data = line.slice(6);
    if (data === "[DONE]") break;
    const { text: token } = JSON.parse(data);
    document.getElementById("output").textContent += token;
  }
}

Error Handling That Won't Bite You

Streams fail differently than regular requests. The connection can drop mid-response, the server can send malformed chunks, or you might hit rate limits after the stream has started. Here's a pattern that handles all of these:

import time
from openai import OpenAI, APIError, APIConnectionError, RateLimitError

client = OpenAI(api_key="your-key", base_url="https://api.kissapi.ai/v1")

def stream_with_retry(messages, model="claude-sonnet-4-6", max_retries=3):
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model=model,
                messages=messages,
                stream=True
            )
            full_text = []
            for chunk in stream:
                content = chunk.choices[0].delta.content
                if content:
                    full_text.append(content)
                    yield content
            return  # success, exit retry loop

        except RateLimitError:
            wait = 2 ** attempt
            print(f"\nRate limited. Waiting {wait}s...")
            time.sleep(wait)

        except APIConnectionError:
            if attempt < max_retries - 1:
                print(f"\nConnection dropped. Retrying...")
                time.sleep(1)
            else:
                raise

        except APIError as e:
            print(f"\nAPI error: {e}")
            raise

The key insight: wrap the entire stream consumption in the try block, not just the initial request. Connection drops happen mid-stream, not at the start.

Streaming vs. Non-Streaming: When to Use Which

Use Streaming WhenSkip Streaming When
Building chat interfacesBatch processing (no user waiting)
Long responses (>500 tokens)Short responses (classification, yes/no)
You want early cancellationYou need the full response before processing
Real-time UX mattersJSON mode (some providers don't stream JSON well)
Showing "typing" indicatorsFunction calling with complex tool use

One gotcha: streaming responses don't include usage data (token counts) in every provider's implementation. If you need exact token counts for billing, you might need to count them yourself or make a separate call. Some providers like OpenAI now include usage in the final chunk — check your provider's docs.

Performance Tips

A few things I've learned from running streaming in production:

  1. Set reasonable timeouts. The first chunk should arrive within 5 seconds for most models. If it doesn't, something's wrong. Set a timeout on the initial connection, not on the full stream.
  2. Buffer before rendering. Don't update the DOM on every single token. Batch 3-5 tokens and render them together. The user won't notice the difference, but your browser will thank you.
  3. Use stream_options for usage data. If your provider supports it, pass stream_options={"include_usage": True} to get token counts in the final chunk.
  4. Cancel early when you can. If the user navigates away or the response is clearly wrong, abort the stream. Every token you don't generate is money saved.
# Python: cancel a stream early
stream = client.chat.completions.create(
    model="claude-sonnet-4-6",
    messages=messages,
    stream=True
)

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        if "ERROR" in content or should_cancel():
            stream.close()  # stop generating, stop paying
            break
        print(content, end="", flush=True)

Streaming with Extended Thinking

Claude's extended thinking mode works with streaming too. The model first streams its reasoning (thinking tokens), then streams the final answer. You can show or hide the thinking phase in your UI:

# Extended thinking tokens come as a separate content block
# The SDK handles this — thinking chunks have a different type
stream = client.chat.completions.create(
    model="claude-sonnet-4-6",
    messages=messages,
    stream=True,
    extra_body={"thinking": {"type": "enabled", "budget_tokens": 5000}}
)

for chunk in stream:
    # Regular content
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Thinking tokens are billed at a lower rate than output tokens on most providers, so extended thinking with streaming is actually a cost-effective way to get better answers on hard problems.

Stream Any AI Model Through One API

KissAPI gives you OpenAI-compatible streaming for Claude, GPT-5, DeepSeek, Qwen, and 50+ models. One endpoint, one API key, full streaming support.

Start Free →

Quick Reference: Streaming Checklist

  1. Set stream: true in your request
  2. Read chunk.choices[0].delta.content (not .message.content)
  3. Handle null content chunks (they happen between content blocks)
  4. Watch for finish_reason: "stop" or [DONE] to know when it's over
  5. Wrap the full stream loop in error handling, not just the initial call
  6. Set a timeout on first-chunk arrival (5s is reasonable)
  7. Collect chunks into a list if you need the full text later
  8. Close the stream early if the response goes sideways

Streaming is one of those features that's simple in concept but has enough edge cases to trip you up in production. Get the basics right, handle errors properly, and your users will never have to stare at a loading spinner again.