FastAPI + Claude API 流式生产后端构建 — SSE、重试与错误恢复实战指南

FastAPI + Claude API 流式生产后端构建 — SSE、重试与错误恢复实战指南

本文是面向后端开发者的FastAPI + Anthropic SDK生产级流式AI后端完整指南。涵盖SSE流式端点实现、限速指数退避重试机制、错误分类与恢复策略、令牌流优化,以及基于Docker的完整容器化部署方案,每步均附完整可运行Python代码示例,是快速掌握并部署流式AI后端的最佳实践指南。

在构建AI后端时,你终究会遇到同一个问题:“能让用户等到完整响应生成完再返回吗?“大多数情况下,答案是否定的。当Claude这样的语言模型生成长文本时,缓冲所有内容再一次性发送会彻底破坏用户体验。

在实际项目中集成这套方案后,我发现流式传输本身并不难。真正的挑战在它的周边:遇到限速怎么办、如何分类错误并分别处理、在Nginx背后让SSE正常流动需要哪些请求头。本文基于FastAPI 0.136和Anthropic SDK 0.97,整理了亲自实现和验证过的生产实践模式。

前置条件

  • Python 3.11及以上(推荐3.12)
  • Anthropic API密钥(ANTHROPIC_API_KEY
  • 基本的FastAPI / asyncio概念

只需要四个依赖项:

pip install fastapi uvicorn anthropic httpx

如果是第一次配置Python环境,可以先看用uv配置Python AI开发环境。它能干净地解决虚拟环境和依赖冲突问题。

Step 1:项目结构与基本配置

首先整理目录布局:

claude-streaming-api/
├── main.py          # FastAPI应用 + 端点
├── retry.py         # 重试逻辑
├── .env             # API密钥(gitignore)
├── Dockerfile
└── docker-compose.yml

main.py的基本骨架:

import os
import anthropic
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

app = FastAPI(title="Claude Streaming API", version="1.0.0")

client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))


class ChatRequest(BaseModel):
    message: str
    max_tokens: int = 1024
    system: str = "You are a helpful assistant."

用Pydantic的BaseModel定义请求模式,FastAPI会自动生成输入验证和OpenAPI文档。如下图所示,Swagger UI会自动生成。

FastAPI Swagger UI — Claude Streaming API端点

本地运行uvicorn main:app --reload后,打开/docs就能通过Swagger UI直接测试。这种便利性是选择FastAPI的主要原因之一。

Step 2:SSE流式端点实现

Server-Sent Events(SSE)是在HTTP上发送单向实时流最简单的方式。比WebSocket实现更简单,非常适合像Claude这样从服务器向客户端流式传输文本的模式。

关键是将FastAPI的StreamingResponse与Anthropic SDK的stream()上下文管理器结合使用:

import asyncio
import json
from typing import AsyncGenerator


async def stream_claude(request: ChatRequest) -> AsyncGenerator[str, None]:
    """Claude API流式传输 → SSE事件生成器"""
    try:
        with client.messages.stream(
            model="claude-opus-4-7-20251101",
            max_tokens=request.max_tokens,
            system=request.system,
            messages=[{"role": "user", "content": request.message}],
        ) as stream:
            for text in stream.text_stream:
                # SSE格式:"data: {...}\n\n"
                yield f"data: {json.dumps({'text': text, 'type': 'delta'}, ensure_ascii=False)}\n\n"

            yield f"data: {json.dumps({'type': 'done'})}\n\n"

    except anthropic.RateLimitError:
        yield f"data: {json.dumps({'type': 'error', 'error': 'rate_limit', 'retry_after': 30})}\n\n"
    except anthropic.AuthenticationError:
        yield f"data: {json.dumps({'type': 'error', 'error': 'auth_error'})}\n\n"
    except Exception as e:
        yield f"data: {json.dumps({'type': 'error', 'error': 'unknown', 'message': str(e)})}\n\n"


@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    return StreamingResponse(
        stream_claude(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用Nginx缓冲——关键配置
        },
    )

用curl测试真实服务器的SSE响应流:

$ curl -sN -X POST http://localhost:8000/chat/stream \
       -H "Content-Type: application/json" \
       -d '{"message": "解释FastAPI和Claude集成"}'

data: {"type": "delta", "text": "FastAPI"}
data: {"type": "delta", "text": "和 "}
data: {"type": "delta", "text": "Claude"}
...
data: {"type": "done"}

SSE事件的格式规则很简单:data: 前缀 + JSON + 两次换行(\n\n)。遵守这个格式,浏览器的EventSource API或大多数SSE客户端会自动解析。

需要注意:anthropic.Anthropic()messages.stream()是同步上下文管理器。在异步FastAPI路由中要避免阻塞uvicorn的事件循环,最好使用AsyncAnthropic

client = anthropic.AsyncAnthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))

async def stream_claude(request: ChatRequest) -> AsyncGenerator[str, None]:
    async with client.messages.stream(...) as stream:
        async for text in stream.text_stream:
            yield f"data: {json.dumps({'text': text, 'type': 'delta'})}\n\n"

使用AsyncAnthropic不会阻塞uvicorn的事件循环。流量不大的早期项目用同步客户端也能正常运行,但生产环境应该使用异步客户端。

Step 3:错误分类与重试策略

不要用同样的方式处理所有AI API错误。每种错误类型需要不同的应对方式:

错误类型分类正确行动
RateLimitErrorrate_limit指数退避后重试
AuthenticationErrorauth_error立即失败,检查API密钥
BadRequestErrortoken_limit立即失败,缩短消息
APIConnectionErrornetwork_error有限次重试
其他unknown立即失败,记录日志

只对限速和网络错误进行重试的指数退避函数:

MAX_RETRIES = 3
BASE_DELAY = 1.0  # 秒


async def call_with_retry(fn, *args, **kwargs):
    """指数退避重试 — 仅对rate_limit和network_error重试"""
    for attempt in range(MAX_RETRIES):
        try:
            return await fn(*args, **kwargs)
        except anthropic.RateLimitError as e:
            if attempt == MAX_RETRIES - 1:
                raise
            delay = BASE_DELAY * (2 ** attempt)
            print(f"[retry] rate_limit, waiting {delay}s (attempt {attempt + 1}/{MAX_RETRIES})")
            await asyncio.sleep(delay)
        except anthropic.APIConnectionError:
            if attempt == MAX_RETRIES - 1:
                raise
            await asyncio.sleep(BASE_DELAY * (2 ** attempt))
        except (anthropic.AuthenticationError, anthropic.BadRequestError):
            raise  # 重试无意义的错误立即传播

我在本地测试这个模式时——模拟一个失败两次后成功的不稳定API——结果是Result: success (after 3 attempts),退避逻辑正常工作。

说实话,这里最让我不确定的是MAX_RETRIESBASE_DELAY的值。限速阈值因Anthropic计划不同而不同,重试间隔太短会再次触发同一限速。建议根据API计划将这些值外部化为环境变量。

Step 4:健康检查与生产部署

在Kubernetes或ECS等容器环境中,健康检查端点是必需的:

import time


@app.get("/health")
async def health_check():
    """用于K8s readiness / liveness probe"""
    return {"status": "ok", "timestamp": time.time()}

Docker镜像:

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Nginx反向代理中,必须禁用缓冲才能让SSE正常流动:

location /chat/stream {
    proxy_pass         http://backend:8000;
    proxy_buffering    off;           # 关键:禁用SSE缓冲
    proxy_cache        off;
    proxy_set_header   Connection     '';
    proxy_http_version 1.1;
    proxy_read_timeout 300s;          # 允许长流式会话
    chunked_transfer_encoding on;
}

忘记proxy_buffering off意味着Nginx会把整个流收集到缓冲区,然后一次性发送。这不是流式传输,只是一个慢响应。这是第一次将SSE放在Nginx后面的人几乎必然会遇到的问题。

Step 5:客户端集成 — 浏览器EventSource与Python

浏览器(JavaScript)

// EventSource仅支持GET — POST请求需要fetch + ReadableStream
const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message: '你好!' }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  
  const text = decoder.decode(value);
  const lines = text.split('\n\n').filter(l => l.startsWith('data:'));
  
  for (const line of lines) {
    const data = JSON.parse(line.slice(6));
    if (data.type === 'delta') {
      outputElement.textContent += data.text;
    }
  }
}

Python(httpx)

import httpx
import json

async def stream_chat(message: str):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "http://localhost:8000/chat/stream",
            json={"message": message},
            timeout=60.0,
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data:"):
                    event = json.loads(line[6:])
                    if event["type"] == "delta":
                        print(event["text"], end="", flush=True)

如果你有使用Vercel AI SDK的前端,用Vercel AI SDK构建Claude流式代理展示了如何在前端侧连接这套方案。useChat钩子负责处理SSE解析,客户端代码会简单得多。

不足之处与实际会卡住的地方

以下是我在实际项目中使用这套技术栈时遇到的真实限制。

第一,流式传输与提示词缓存的组合比较棘手。 Claude的提示词缓存能显著降低输入令牌成本。但同时使用流式传输和缓存时,在流传输中途无法知道是否命中缓存。可以在流式传输完成后从usage对象中确认,但如果需要实时反映缓存状态的UI,实现会变得复杂。建议先阅读Claude API提示词缓存成本优化,在架构设计时就考虑缓存策略。

第二,uvicorn的worker数量和连接管理比想象中复杂。 SSE会长时间保持连接。使用--workers 4时,同时最多只能处理4个长流式连接。实际流量超过这个数量时请求会排队。需要在Kubernetes上水平扩展,或者使用gunicorn + uvicorn worker class组合。

第三,重试逻辑在流式传输中途介入时处理复杂。 流式传输进行到一半时发生网络错误怎么办?从头重新请求意味着客户端会收到重复文本。实用的解决方案是客户端管理last-event-id,让服务器接收后从断点继续生成,但这个实现超出了本文的范围。

对于不需要流式传输的批量处理场景,这个模式也是过度设计。如果要批量处理1000个文档,Anthropic Message Batches API要便宜和合适得多。

故障排除FAQ

Q:SSE不是流式到达,而是一次性全部到来

大多数情况是Nginx缺少proxy_buffering off。另外,如果没有Content-Type: text/event-stream响应头,浏览器不会将其识别为SSE。

Q:偶发的asyncio.CancelledError

客户端在流式传输中途断开连接时,FastAPI会取消生成器。在stream_claude中添加except asyncio.CancelledError: return可以干净地退出。

Q:RuntimeError: Event loop is closed错误

在异步上下文中使用同步的anthropic.Anthropic()客户端可能会发生这种情况。切换到anthropic.AsyncAnthropic()是根本解决方案。

Q:触发限速,重试一直失败

要么BASE_DELAY太短,要么突发流量集中在同一时间窗口。查看Anthropic的限速页面确认你计划的TPM/RPM上限,将BASE_DELAY至少设为5秒以上。

总结:何时选择这套技术栈

FastAPI + AsyncAnthropic + uvicorn组合适合以下情况:

  • 团队已有Python能力,想避免引入新语言栈的成本
  • 流式传输是核心用户体验的AI聊天、代码生成、文档写作服务
  • 需要OpenAPI文档自动化和Pydantic验证的团队

说实话,这套技术栈并非在所有情况下都是最佳选择。如果是Node.js团队,Vercel AI SDK上手更快;如果需要大规模实时并发连接,WebSocket或gRPC Streaming可能是更好的选择。但对于想快速启动Python AI流式后端的人来说,这是我亲自验证过的最实用的起点。

下一步建议:应用提示词缓存降低成本,为流式响应添加OpenTelemetry追踪,让延迟和令牌使用量可见。

阅读其他语言版本

这篇文章有帮助吗?

您的支持能帮助我创作更好的内容。请我喝杯咖啡吧。

关于作者

jw

Kim Jangwook

AI/LLM专业全栈开发者

凭借10年以上的Web开发经验,构建AI代理系统、LLM应用程序和自动化解决方案。分享Claude Code、MCP和RAG系统的实践经验。

返回博客列表