FastAPI + Claude API ストリーミングバックエンド — SSE・リトライ・エラー復旧 実践ガイド
FastAPIとAnthropic SDKでプロダクションレベルのストリーミングAIバックエンドを構築する完全ガイド。SSEストリーミングエンドポイント実装、レートリミット指数バックオフリトライ、エラー分類戦略、トークンストリーミング最適化、Dockerコンテナデプロイをステップごとにコード例付きで解説します。
AIバックエンドを構築していると、必ず一つの問いに突き当たる。「レスポンスが全部生成されるまでユーザーを待たせてもいいのか?」答えはほとんどの場合「ノー」だ。特にClaudeのような言語モデルが長いテキストを生成するとき、全体が完成してから一気に返す方式はUXを壊す。
実際のサービスに組み込んでみて感じたのは、ストリーミング自体は難しくないということだ。本当の問題はその周辺にある。レートリミットに引っかかったときどうするか、エラーをどう分類してそれぞれ違う処理をするか、NginxのうしろでSSEを正しく流すにはどのヘッダーが必要か。この記事はFastAPI 0.136とAnthropic SDK 0.97をベースに、その実践パターンを自分で実装・検証した結果をまとめたものだ。
始める前に必要なもの
- Python 3.11以上(3.12推奨)
- Anthropic APIキー(
ANTHROPIC_API_KEY) - 基本的なFastAPI / asyncioの知識
依存関係は4つだけ:
pip install fastapi uvicorn anthropic httpx
Python環境の構成が初めてなら、uvでPython AI開発環境をセットアップする方法を先に読むといい。仮想環境と依存関係の衝突問題をきれいに解決してくれる。
Step 1: プロジェクト構造と基本設定
まずディレクトリを整理する:
claude-streaming-api/
├── main.py # FastAPIアプリ + エンドポイント
├── retry.py # リトライロジック
├── .env # APIキー(gitignore)
├── Dockerfile
└── docker-compose.yml
main.pyの基本骨格:
import os
import anthropic
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI(title="Claude Streaming API", version="1.0.0")
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
class ChatRequest(BaseModel):
message: str
max_tokens: int = 1024
system: str = "You are a helpful assistant."
PydanticのBaseModelでリクエストスキーマを定義すると、FastAPIが自動で入力バリデーションとOpenAPIドキュメントを生成する。下の画像のようにSwagger UIが自動生成されるのを確認できる。
ローカルでuvicorn main:app --reloadを実行すると、/docsのSwagger UIから直接テストできる。この手軽さがFastAPIを選んだ主な理由の一つだ。
Step 2: SSEストリーミングエンドポイントの実装
Server-Sent Events(SSE)はHTTP上で単方向リアルタイムストリームを送る最もシンプルな方法だ。WebSocketより実装が簡単で、Claudeのようにサーバーからクライアントへテキストを流すパターンにぴったり合う。
ポイントはFastAPIのStreamingResponseとAnthropic SDKのstream()コンテキストマネージャを組み合わせることだ:
import asyncio
import json
from typing import AsyncGenerator
async def stream_claude(request: ChatRequest) -> AsyncGenerator[str, None]:
"""Claude API ストリーミング → SSEイベントジェネレーター"""
try:
with client.messages.stream(
model="claude-opus-4-7-20251101",
max_tokens=request.max_tokens,
system=request.system,
messages=[{"role": "user", "content": request.message}],
) as stream:
for text in stream.text_stream:
# SSEフォーマット: "data: {...}\n\n"
yield f"data: {json.dumps({'text': text, 'type': 'delta'})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except anthropic.RateLimitError:
yield f"data: {json.dumps({'type': 'error', 'error': 'rate_limit', 'retry_after': 30})}\n\n"
except anthropic.AuthenticationError:
yield f"data: {json.dumps({'type': 'error', 'error': 'auth_error'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': 'unknown', 'message': str(e)})}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
return StreamingResponse(
stream_claude(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginxバッファリング無効化必須
},
)
実際のSSEレスポンスストリームをcurlでテストするとこう流れる:
$ curl -sN -X POST http://localhost:8000/chat/stream \
-H "Content-Type: application/json" \
-d '{"message": "FastAPIとClaudeを組み合わせる方法を教えて"}'
data: {"type": "delta", "text": "FastAPI"}
data: {"type": "delta", "text": "と "}
data: {"type": "delta", "text": "Claude"}
...
data: {"type": "done"}
SSEイベントのフォーマット規則はシンプルだ:data: プレフィックス + JSON + 2回の改行(\n\n)。このフォーマットさえ守れば、ブラウザのEventSource APIや大抵のSSEクライアントが自動でパースしてくれる。
一点注意:anthropic.Anthropic()クライアントのmessages.stream()は同期コンテキストマネージャだ。非同期FastAPIルート内でブロッキングなしに実行するには、AsyncAnthropicを使う方が正確だ:
client = anthropic.AsyncAnthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
async def stream_claude(request: ChatRequest) -> AsyncGenerator[str, None]:
async with client.messages.stream(...) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'text': text, 'type': 'delta'})}\n\n"
AsyncAnthropicを使えばuvicornのイベントループをブロックしない。トラフィックが少ない初期プロジェクトでは同期クライアントも問題なく動くが、プロダクションでは非同期クライアントが正解だ。
Step 3: エラー分類とリトライ戦略
AI APIのエラーを全部同じ方法で処理してはいけない。エラーごとに正しい行動が違うからだ:
| エラー種別 | 分類 | 正しい行動 |
|---|---|---|
RateLimitError | rate_limit | 指数バックオフ後にリトライ |
AuthenticationError | auth_error | 即失敗、APIキー確認 |
BadRequestError | token_limit | 即失敗、メッセージ削減 |
APIConnectionError | network_error | 制限付きリトライ |
| その他 | unknown | 即失敗、ログ記録 |
レートリミットとネットワークエラーのみリトライする指数バックオフ関数:
MAX_RETRIES = 3
BASE_DELAY = 1.0 # seconds
async def call_with_retry(fn, *args, **kwargs):
"""指数バックオフリトライ — rate_limitとnetwork_errorのみリトライ"""
for attempt in range(MAX_RETRIES):
try:
return await fn(*args, **kwargs)
except anthropic.RateLimitError as e:
if attempt == MAX_RETRIES - 1:
raise # 最後の試みでも失敗なら伝播
delay = BASE_DELAY * (2 ** attempt)
print(f"[retry] rate_limit, waiting {delay}s (attempt {attempt + 1}/{MAX_RETRIES})")
await asyncio.sleep(delay)
except anthropic.APIConnectionError:
if attempt == MAX_RETRIES - 1:
raise
await asyncio.sleep(BASE_DELAY * (2 ** attempt))
except (anthropic.AuthenticationError, anthropic.BadRequestError):
raise # リトライしても意味のないエラーはすぐ伝播
このパターンを自分でテストしたとき、2回失敗後に3回目で成功するflakyなAPIをシミュレーションした結果、Result: success (after 3 attempts)で正常動作を確認した。
正直に言うと、リトライロジックで一番気になる部分はMAX_RETRIESとBASE_DELAYの値だ。レートリミットはAnthropicのプランによって異なり、リトライ間隔が短すぎると同じレートリミットにまた引っかかる。自分はAPIプランに応じてこれらの値を環境変数で外部化することを推奨している。
Step 4: ヘルスチェックとプロダクションデプロイ
KubernetesやECSのようなコンテナ環境ではヘルスチェックエンドポイントが必須だ:
import time
@app.get("/health")
async def health_check():
"""K8s readiness / liveness probe用"""
return {"status": "ok", "timestamp": time.time()}
Dockerイメージ:
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
NginxリバースプロキシでSSEを正しく流すには、バッファリングを必ず無効にする必要がある:
location /chat/stream {
proxy_pass http://backend:8000;
proxy_buffering off; # SSEバッファリング無効化必須
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
proxy_read_timeout 300s; # 長いストリーミングセッション許容
chunked_transfer_encoding on;
}
proxy_buffering offを忘れると、Nginxがストリームをすべてバッファにためてからまとめて返す。それはストリーミングではなく、ただの遅いレスポンスになる。この設定は初めてSSEをNginxの後ろにつける人がほぼ確実に一度は経験する問題だ。
Step 5: クライアント連携: ブラウザEventSourceとPython
ブラウザ(JavaScript):
// EventSourceはGET専用 — POSTリクエストはfetch + ReadableStream必要
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: 'こんにちは' }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n\n').filter(l => l.startsWith('data:'));
for (const line of lines) {
const data = JSON.parse(line.slice(6));
if (data.type === 'delta') {
outputElement.textContent += data.text;
}
}
}
Python(httpx):
import httpx
import json
async def stream_chat(message: str):
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8000/chat/stream",
json={"message": message},
timeout=60.0,
) as response:
async for line in response.aiter_lines():
if line.startswith("data:"):
event = json.loads(line[6:])
if event["type"] == "delta":
print(event["text"], end="", flush=True)
Vercel AI SDKを使うフロントエンドがある場合、Vercel AI SDKでClaudeストリーミングエージェントを作るを参照するとフロントエンド連携をより速く進められる。useChatフックがSSEのパースを代わりにやってくれて、クライアントコードがずっとシンプルになる。
残念な点と実際に詰まるポイント
このスタックを実際のプロジェクトで使って感じた限界を正直に整理する。
一つ目、ストリーミングとプロンプトキャッシュの組み合わせが難しい。 Claude APIのプロンプトキャッシュは入力トークンコストを大きく削減する。しかしストリーミングとキャッシュを同時に使うとき、キャッシュヒット有無をストリーム途中に知ることができない。ストリーミング完了後のusageオブジェクトで確認できるが、リアルタイムでキャッシュ状態を反映するUIが必要なら実装が複雑になる。Claude APIプロンプトキャッシュでコスト最適化する方法でキャッシュ戦略を事前に把握しておくといい。
二つ目、uvicornのワーカー数とコネクション管理が思ったより複雑だ。 SSEは接続を長く保持する。--workers 4で4ワーカーを使うなら、同時に最大4つの長いストリーミング接続しかできない。実際のトラフィックがこれを超えるとリクエストが待機する。Kubernetesで水平スケールするか、gunicorn + uvicorn worker classの組み合わせが必要だ。
三つ目、リトライロジックがストリーミング途中に入ると処理が複雑になる。 ストリーミングが半分進んだときにネットワークエラーが起きたらどうするか。最初からリクエストし直すと、クライアントはすでに受け取ったテキストが重複する。実用的な解決策はクライアント側でlast-event-idを管理し、サーバーがそれを受け取って続きから生成することだが、この実装はこの記事の範囲を超える。
このパターンはストリーミングレスポンスが不要な大量処理シナリオにはオーバーエンジニアリングだ。1,000件のドキュメントをバッチ処理するならAnthropic Message Batches APIの方がずっと安くて適切だ。
トラブルシューティング FAQ
Q: SSEがクライアントに届かず一気に来る
Nginxのproxy_buffering offが抜けている場合がほとんどだ。もしくはContent-Type: text/event-streamヘッダーがなければブラウザがSSEとして認識しない。
Q: asyncio.CancelledErrorが断続的に発生する
クライアントがストリーミング途中で接続を切ると、FastAPIがジェネレーターをキャンセルする。stream_claudeの中でexcept asyncio.CancelledError: returnを処理するときれいに終了できる。
Q: RuntimeError: Event loop is closed エラー
同期のanthropic.Anthropic()クライアントを非同期コンテキストで使うと発生することがある。anthropic.AsyncAnthropic()に置き換えるのが根本解決策だ。
Q: レートリミットに引っかかり、リトライしても失敗し続ける
BASE_DELAYが短すぎるか、短時間にリクエストが集中するバーストトラフィックが原因だ。AnthropicのRate LimitsページでプランごとのTPM/RPM上限を確認し、BASE_DELAYを最低5秒以上に上げることを推奨する。
いつ使い、いつ避けるべきか
ストリーミングバックエンドを無条件にSSE + FastAPIで組むのが正解とは限らない。実際に運用した経験から、選択基準を整理する。
このスタックが本領を発揮する場面:
- Pythonチームがすでにあり、新しい言語スタック導入コストを避けたいとき
- ストリーミングがコアUX要素であるAIチャット、コード生成、文書作成サービス
- OpenAPIドキュメント自動化とPydanticバリデーションが必要なチーム
- 既存のFastAPIまたはDjango RESTバックエンドにAI機能を段階的に追加する状況
避けたほうがよい場面:
- レスポンスを一括で受け取ってもUXに支障がない短い分類・抽出タスク。この場合は単純なリクエスト-レスポンスのほうがコードもシンプルでデバッグも楽だ。
- 1,000件以上のドキュメントを一括処理するバッチ作業。ストリーミングは意味がなく、Anthropic Message Batches APIがコスト面で半分程度になる。
- 双方向リアルタイム操作(タイピングインジケーター、同時編集)が必要な場合。SSEは単方向なのでWebSocketが適切だ。
- ローカル・オンプレミス環境で外部API呼び出し自体がブロックされている場合。まずはセルフホストモデルが先になる。セルフホストの選択肢はOllamaとFastAPIでプロダクションデプロイする方法で扱った。
つまり「長い出力 + リアルタイム表示」という二つの条件が同時に成立するときだけ、このパターンの複雑さが正当化される。どちらか一方が欠ければ、より単純な方法がある。
一次ソースと参考資料
この記事のコードは以下の公式ドキュメントを基準に作成・検証した。バージョンが上がると動作が変わることがあるので、実装前に一度確認することを推奨する。
- FastAPI公式ドキュメント — Custom Response / StreamingResponse:
StreamingResponseのジェネレーター動作とキャンセル処理に関する公式説明。 - Anthropic — Streaming Messages: Claude APIのSSEストリーミングイベント構造とSDKごとの使い方。
- MDN — Using server-sent events: SSEイベントフォーマット(
data:、event:、id:、retry:)とEventSourceAPIの標準定義。
型安全なリクエストスキーマをより厳密にしたいなら、Pydantic AIで型安全なエージェントを作るも併せて読むと役立つ。
正直に言うと、このスタックがすべての状況で最善ではない。Node.jsチームならVercel AI SDKの方が速く、大規模なリアルタイム接続が必要ならWebSocketやgRPC Streamingが良い選択肢になる。しかしPython AIバックエンドを素早く立ち上げたいなら、このパターンは自分が実際に検証した最も実用的な出発点だ。
次のステップとしては、プロンプトキャッシュを適用してコストを下げ、ストリーミングレスポンスにOpenTelemetryトレーシングをつけてレイテンシとトークン使用量を可視化する作業を推奨する。
よくある質問
FastAPIでClaude APIのストリーミングをどう実装しますか?
SSEストリーミングでレートリミットとエラー復旧はどう扱いますか?
本番デプロイで何を考慮すべきですか?
他の言語で読む
- 🇰🇷 한국어
- 🇯🇵 日本語(現在のページ)
- 🇺🇸 English
- 🇨🇳 中文
この記事は役に立ちましたか?
より良いコンテンツを作成するための力になります。コーヒー一杯で応援してください。