FastAPI + Claude API: Production Streaming API — SSE & Retry
A production FastAPI streaming guide with Anthropic SDK. SSE endpoints, exponential backoff retry, error classification, and Docker deployment covered.
When building an AI backend, you eventually hit the same question: “Can I make users wait until the whole response is generated?” Most of the time the answer is no. When a model like Claude is producing a long piece of text, buffering everything and sending it all at once kills the UX.
Having integrated this into actual services, what I found is that streaming itself isn’t the hard part. The real complexity is around it. What to do when you hit a rate limit. How to classify errors so each one gets handled differently. Which headers you need to make SSE flow properly behind Nginx. This guide covers those production patterns, implemented and tested against FastAPI 0.136 and Anthropic SDK 0.97.
What You Need Before Starting
- Python 3.11 or later (3.12 recommended)
- Anthropic API key (
ANTHROPIC_API_KEY) - Basic understanding of FastAPI and asyncio
You only need four dependencies:
pip install fastapi uvicorn anthropic httpx
If you’re new to Python environment setup, setting up a Python AI development environment with uv is a good first read. It cleanly solves virtual environment and dependency conflict issues.
Step 1: Project Structure and Basic Setup
Start with a clean directory layout:
claude-streaming-api/
├── main.py # FastAPI app + endpoints
├── retry.py # retry logic
├── .env # API key (gitignored)
├── Dockerfile
└── docker-compose.yml
The skeleton of main.py:
import os
import anthropic
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI(title="Claude Streaming API", version="1.0.0")
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
class ChatRequest(BaseModel):
message: str
max_tokens: int = 1024
system: str = "You are a helpful assistant."
Defining the request schema with Pydantic’s BaseModel gives you automatic input validation and OpenAPI docs from FastAPI for free. As you can see in the screenshot below, the Swagger UI generates automatically.
Running uvicorn main:app --reload locally and opening /docs gives you a live Swagger UI you can test directly. That convenience is one of the main reasons I reach for FastAPI.
Step 2: Implementing the SSE Streaming Endpoint
Server-Sent Events (SSE) is the simplest way to push a one-directional real-time stream over HTTP. It’s simpler to implement than WebSocket and fits perfectly for the pattern of streaming text from server to client, which is exactly what Claude does.
The key is combining FastAPI’s StreamingResponse with Anthropic SDK’s stream() context manager:
import asyncio
import json
from typing import AsyncGenerator
async def stream_claude(request: ChatRequest) -> AsyncGenerator[str, None]:
"""Claude API streaming → SSE event generator"""
try:
with client.messages.stream(
model="claude-opus-4-7-20251101",
max_tokens=request.max_tokens,
system=request.system,
messages=[{"role": "user", "content": request.message}],
) as stream:
for text in stream.text_stream:
# SSE format: "data: {...}\n\n"
yield f"data: {json.dumps({'text': text, 'type': 'delta'})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except anthropic.RateLimitError:
yield f"data: {json.dumps({'type': 'error', 'error': 'rate_limit', 'retry_after': 30})}\n\n"
except anthropic.AuthenticationError:
yield f"data: {json.dumps({'type': 'error', 'error': 'auth_error'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': 'unknown', 'message': str(e)})}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
stream_claude(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable Nginx buffering — critical
},
)
Testing with curl against a live server, the SSE stream looks like this:
$ curl -sN -X POST http://localhost:8000/chat/stream \
-H "Content-Type: application/json" \
-d '{"message": "Explain FastAPI and Claude integration"}'
data: {"type": "delta", "text": "FastAPI"}
data: {"type": "delta", "text": " and "}
data: {"type": "delta", "text": "Claude"}
...
data: {"type": "done"}
The SSE format rules are simple: data: prefix + JSON + two newlines (\n\n). Follow that format and the browser’s EventSource API or most SSE clients will parse it automatically.
One thing to watch: anthropic.Anthropic()’s messages.stream() is a synchronous context manager. To avoid blocking uvicorn’s event loop inside an async FastAPI route, use AsyncAnthropic instead:
client = anthropic.AsyncAnthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
async def stream_claude(request: ChatRequest) -> AsyncGenerator[str, None]:
async with client.messages.stream(...) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'text': text, 'type': 'delta'})}\n\n"
With AsyncAnthropic, you won’t block uvicorn’s event loop. The sync client works fine for low-traffic early-stage projects, but production warrants the async client.
Step 3: Error Classification and Retry Strategy
Don’t handle all AI API errors the same way. Each error type calls for a different response:
| Error Type | Classification | Correct Action |
|---|---|---|
RateLimitError | rate_limit | Retry with exponential backoff |
AuthenticationError | auth_error | Fail immediately, check API key |
BadRequestError | token_limit | Fail immediately, shorten message |
APIConnectionError | network_error | Retry with limits |
| Other | unknown | Fail immediately, log the event |
An exponential backoff function that only retries rate limits and network errors:
MAX_RETRIES = 3
BASE_DELAY = 1.0 # seconds
async def call_with_retry(fn, *args, **kwargs):
"""Exponential backoff retry — only for rate_limit and network_error"""
for attempt in range(MAX_RETRIES):
try:
return await fn(*args, **kwargs)
except anthropic.RateLimitError as e:
if attempt == MAX_RETRIES - 1:
raise
delay = BASE_DELAY * (2 ** attempt)
print(f"[retry] rate_limit, waiting {delay}s (attempt {attempt + 1}/{MAX_RETRIES})")
await asyncio.sleep(delay)
except anthropic.APIConnectionError:
if attempt == MAX_RETRIES - 1:
raise
await asyncio.sleep(BASE_DELAY * (2 ** attempt))
except (anthropic.AuthenticationError, anthropic.BadRequestError):
raise # No point retrying these — propagate immediately
I tested this pattern locally against a flaky API that fails twice before succeeding. The result was Result: success (after 3 attempts). Backoff worked as expected.
Honestly, the part of this I’m most uncertain about is the MAX_RETRIES and BASE_DELAY values. Rate limits differ per Anthropic plan, and if your retry interval is too short, you’ll hit the same rate limit again. I’d recommend externalizing these values as environment variables based on your API plan.
Step 4: Health Checks and Production Deployment
In container environments like Kubernetes or ECS, a health check endpoint is non-negotiable:
import time
@app.get("/health")
async def health_check():
"""For K8s readiness / liveness probes"""
return {"status": "ok", "timestamp": time.time()}
Docker image:
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
For Nginx reverse proxy, you must disable buffering to let SSE flow properly:
location /chat/stream {
proxy_pass http://backend:8000;
proxy_buffering off; # Critical: disable SSE buffering
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
proxy_read_timeout 300s; # Allow long streaming sessions
chunked_transfer_encoding on;
}
Leaving out proxy_buffering off means Nginx collects the entire stream in its buffer and sends it all at once. That’s not streaming. It’s just a slow response. Nearly everyone makes this mistake the first time they put SSE behind Nginx.
Step 5: Client Integration: Browser EventSource and Python
Browser (JavaScript):
// EventSource is GET-only — for POST requests, use fetch + ReadableStream
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: 'Hello!' }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n\n').filter(l => l.startsWith('data:'));
for (const line of lines) {
const data = JSON.parse(line.slice(6));
if (data.type === 'delta') {
outputElement.textContent += data.text;
}
}
}
Python (httpx):
import httpx
import json
async def stream_chat(message: str):
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8000/chat/stream",
json={"message": message},
timeout=60.0,
) as response:
async for line in response.aiter_lines():
if line.startswith("data:"):
event = json.loads(line[6:])
if event["type"] == "delta":
print(event["text"], end="", flush=True)
If you have a frontend using the Vercel AI SDK, building a Claude streaming agent with the Vercel AI SDK shows how to wire this up on the frontend side. The useChat hook handles SSE parsing for you, which makes client-side code much simpler.
Limitations and Where You’ll Actually Get Stuck
Here are the honest limitations I hit when using this stack in real projects.
First, combining streaming with prompt caching is tricky. Claude’s prompt caching reduces input token costs significantly. But when using streaming and caching together, you can’t know mid-stream whether the cache was hit. The usage object is available after streaming completes, but if you need to reflect cache status in real time, the implementation gets complex. Read Claude API prompt caching cost optimization before you design your architecture around caching.
Second, uvicorn worker count and connection management is more involved than it looks. SSE keeps connections open for a long time. With --workers 4, you can handle at most 4 concurrent long-running streaming connections. When real traffic exceeds that, requests queue. You’ll need horizontal scaling on Kubernetes or the gunicorn + uvicorn worker class combination.
Third, retry logic mid-stream is a hard problem. What do you do when a network error hits halfway through a stream? Restarting the request from scratch means the client gets duplicate text. The practical solution is having the client track last-event-id so the server can resume. That implementation is outside this guide’s scope, but worth planning for early.
This pattern is also overkill for bulk processing where streaming isn’t the point. If you’re processing 1,000 documents in batch, the Anthropic Message Batches API is far cheaper and more appropriate.
Troubleshooting FAQ
Q: SSE arrives all at once instead of streaming
proxy_buffering off is missing from Nginx in most cases. Also check that the Content-Type: text/event-stream header is present. Without it, browsers won’t recognize the response as SSE.
Q: Intermittent asyncio.CancelledError
When a client disconnects mid-stream, FastAPI cancels the generator. Adding except asyncio.CancelledError: return inside stream_claude exits cleanly.
Q: RuntimeError: Event loop is closed
This can happen when using the synchronous anthropic.Anthropic() client inside an async context. Switching to anthropic.AsyncAnthropic() is the root fix.
Q: Rate limited, retries keep failing
Either BASE_DELAY is too short or burst traffic is hammering the same window. Check Anthropic’s Rate Limits page for your plan’s TPM/RPM limits and set BASE_DELAY to at least 5 seconds.
When to Use It and When to Avoid It
Reaching for SSE + FastAPI isn’t always the right call. Here’s how I decide, based on actually shipping with it.
This stack earns its place when:
- You have a Python team and want to avoid the cost of adopting a new language stack
- Streaming is a core UX element — AI chat, code generation, document drafting
- You want OpenAPI documentation auto-generation and Pydantic validation out of the box
- You’re adding AI features incrementally to an existing FastAPI or Django REST backend
You’re better off avoiding it when:
- The task is a short classification or extraction where receiving the answer all at once doesn’t hurt UX. A plain request-response is simpler to write and easier to debug.
- You’re batch-processing 1,000+ documents. Streaming buys you nothing, and the Anthropic Message Batches API runs at roughly half the cost.
- You need bidirectional real-time interaction (typing indicators, collaborative editing). SSE is one-directional, so WebSocket is the right tool.
- You’re in a local or on-prem environment where outbound API calls are blocked. Self-hosting a model comes first there. I covered the self-hosted route in deploying a production backend with Ollama and FastAPI.
In short, the complexity of this pattern is only justified when “long output” and “real-time display” hold at the same time. Drop either one and a simpler approach exists.
Primary Sources and Further Reading
The code here was written and tested against the following official docs. Behavior can change across versions, so it’s worth checking these before you implement.
- FastAPI docs — Custom Response / StreamingResponse: the official explanation of how
StreamingResponsehandles generators and cancellation. - Anthropic — Streaming Messages: the SSE streaming event structure for the Claude API and per-SDK usage.
- MDN — Using server-sent events: the standard definition of the SSE event format (
data:,event:,id:,retry:) and theEventSourceAPI.
If you want stricter, type-safe request schemas, building type-safe agents with Pydantic AI is a useful companion read.
To be honest, this isn’t the right stack for every situation. If you have a Node.js team, the Vercel AI SDK is faster to ship. If you need massive concurrent real-time connections, WebSocket or gRPC Streaming might be better. But for getting a Python AI streaming backend running quickly, this is the most practical starting point I’ve personally verified.
Next steps: apply prompt caching to cut costs, add OpenTelemetry tracing to your streaming responses, and make latency and token usage visible.
Frequently Asked Questions
How do I implement Claude API streaming with FastAPI?
How do I handle rate limits and error recovery in SSE streaming?
What should I consider for production deployment?
Was this helpful?
Your support helps me create better content. Buy me a coffee.