为什么要流式
一次 LLM 请求通常要 2-30 秒。如果你等全部生成完再返回,用户看到的就是"转圈 10 秒,然后啪一下一大段文字"。换成流式:
用户在 500ms 之后就看到文字一点点吐出来,这是个巨大的体验升级。所有面向终端用户的 chat UI 都是流式。
stream=True:最简单的流式
from litellm import completion resp = completion( model="gpt-4o-mini", messages=[{"role": "user", "content": "写一首四句五言诗"}], stream=True, ) for chunk in resp: delta = chunk.choices[0].delta.content or "" print(delta, end="", flush=True) print()
这里发生了什么:
completion(stream=True)立即返回一个生成器,不是普通 response。- 每次
for chunk in拿到一个ModelResponseStream对象——结构和非流式的ModelResponse几乎一样,但message改成了delta。 delta.content是本次增量的文本。可能是 None(比如第一块只有 role、最后一块 finish_reason),所以要用or ""兜底。- 生成器迭代结束 = 整条流结束。
chunk 长什么样
打印第一个 chunk,你大概会看到:
ModelResponseStream( id='chatcmpl-abc', choices=[StreamingChoices( index=0, delta=Delta(content='', role='assistant', tool_calls=None), finish_reason=None )], created=1730000000, model='gpt-4o-mini', )
典型的一条完整流可能长这样(简化):
chunk #0 delta={role:"assistant", content:""} finish_reason=None
chunk #1 delta={content:"春"} finish_reason=None
chunk #2 delta={content:"眠"} finish_reason=None
chunk #3 delta={content:"不觉晓, "} finish_reason=None
chunk #4 delta={content:"处处闻啼鸟。"} finish_reason=None
...
chunk #N delta={content:""} finish_reason="stop"
每一家的 chunk 粒度不一样——OpenAI 通常一 chunk 1-3 token,Anthropic 的 chunk 更大;但 LiteLLM 已经把它们都归一成了上面这个 OpenAI 风格结构。你写一次 for 循环,到哪家都对。
重组完整文本
有时你一边流式吐给用户,一边还要保存完整答案入库。两种做法:
方法一:自己拼接
full = "" for chunk in resp: delta = chunk.choices[0].delta.content or "" full += delta print(delta, end="", flush=True) save_to_db(full)
方法二:stream_chunk_builder
LiteLLM 提供一个工具函数,把所有 chunks 合并成一个完整的 ModelResponse(带上 usage):
from litellm import stream_chunk_builder chunks = [] for chunk in resp: chunks.append(chunk) delta = chunk.choices[0].delta.content or "" print(delta, end="", flush=True) full = stream_chunk_builder(chunks) print(full.choices[0].message.content) # 完整文本 print(full.usage.completion_tokens) # 完整用量
优势:工具调用、finish_reason 等非文本字段也能被正确合并。流式场景里要拿到完整 usage,这是最可靠的办法。
流式里的 usage:不是默认给你的
一个常见坑:流式场景里,OpenAI 默认不给你 token 数。只有最后一个 chunk 是 finish_reason=stop,没有 usage 字段。要拿到 usage,得显式开关:
resp = completion( model="gpt-4o-mini", messages=msgs, stream=True, stream_options={"include_usage": True}, # 关键 ) for chunk in resp: if chunk.usage: print("用量:", chunk.usage) # 只有最后一个 chunk 有 usage
LiteLLM 有个更省心的模式——litellm.include_usage = True 全局打开,所有流式调用都带用量。推荐生产环境打开,否则你连本次调用花了多少钱都算不出来。
finish_reason:流为什么停了
流结束时最后一个 chunk 会带 finish_reason。可能的值:
full, finish = "", None for chunk in resp: c = chunk.choices[0] full += c.delta.content or "" if c.finish_reason: finish = c.finish_reason if finish == "length": logger.warning("模型输出被截断, 考虑提高 max_tokens")
异步:acompletion 的正确姿势
同步的 completion() 一次只能等一个响应——在 FastAPI 等异步 server 里用它会直接阻塞事件循环。必须换成 acompletion:
import asyncio from litellm import acompletion async def ask(model, q): resp = await acompletion( model=model, messages=[{"role":"user","content":q}], max_tokens=256, ) return resp.choices[0].message.content async def main(): # 5 个请求并发 tasks = [ask("gpt-4o-mini", f"第{i}题: 写一句吉祥话") for i in range(5)] results = await asyncio.gather(*tasks) for r in results: print(r) asyncio.run(main())
5 个请求几乎是同时发出去的——总耗时约等于最慢那一个。同步版本要 5 倍时间。
并发 + 流式:async for
async def ask_stream(model, q): resp = await acompletion( model=model, messages=[{"role":"user","content":q}], stream=True, max_tokens=256, ) async for chunk in resp: delta = chunk.choices[0].delta.content or "" print(delta, end="", flush=True) print()
两个关键区别:
await acompletion(..., stream=True)——要先await拿到异步迭代器- 用
async for,不是普通for
限流与信号量
OpenAI 的 Tier 1 账号通常限 500 RPM。你一口气 1000 个 gather,立刻 429。用 asyncio.Semaphore 控制并发:
import asyncio from litellm import acompletion SEM = asyncio.Semaphore(20) # 最多 20 个 in-flight async def ask(q): async with SEM: resp = await acompletion( model="gpt-4o-mini", messages=[{"role":"user","content":q}], max_tokens=256, timeout=30, ) return resp.choices[0].message.content async def main(): questions = [f"介绍中国第{i}位皇帝" for i in range(200)] results = await asyncio.gather(*[ask(q) for q in questions]) print(len(results)) asyncio.run(main())
这是个非常常见的批处理模式。第 7 章的 Router 会把限流和重试做得更优雅——这里你先手搓理解一下原理。
内置重试:num_retries
网络抖一下、上游 5xx、偶发超时,都不该让用户看到失败。LiteLLM 自带重试:
resp = completion( model="gpt-4o-mini", messages=msgs, num_retries=3, # 失败最多重试 3 次 timeout=30, )
默认只对可重试的错误重试(5xx、429、网络断),4xx 业务错误不会重试。退避策略是指数退避(1s, 2s, 4s ...)。
如果你需要对所有错误都重试(不推荐但偶尔有用):
resp = completion( model="gpt-4o-mini", messages=msgs, num_retries=3, retry_policy={"RateLimitErrorRetries": 5, "InternalServerErrorRetries": 3, "BadRequestErrorRetries": 0}, # 4xx 别重 )
httpx 连接复用
一个隐藏性能点:默认情况下,LiteLLM 每次请求会新建 HTTP 连接(特别是老版本)。短间隔大量调用时,握手成本明显。办法是传入自己管理的 httpx client:
import httpx from litellm import acompletion # 全局复用一个 async client, 跟着 app 生命周期 client = httpx.AsyncClient( limits=httpx.Limits(max_connections=100, max_keepalive_connections=50), timeout=httpx.Timeout(60), ) async def ask(q): resp = await acompletion( model="gpt-4o-mini", messages=[{"role":"user","content":q}], client=client, # 复用 ) return resp.choices[0].message.content
不是每家 provider 都完全支持这个优化——OpenAI 走的是官方 sdk 的 httpx,Anthropic 是官方 sdk,Gemini 是它家的 sdk——LiteLLM 在内部管理连接池,一般不用手动干预。但在超高吞吐场景(>1k QPS),手动给 client 是必要的。
流式工具调用:魔鬼在细节
最容易踩坑的一块:模型在流式里决定调工具,参数是一个字符一个字符流出来的 JSON 字符串。你不能简单拼一下就 json.loads,中间任何一个 chunk 都是不完整的 JSON。
from litellm import completion, stream_chunk_builder tools = [{ "type": "function", "function": { "name": "get_weather", "description": "返回某城市天气", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, }, }] resp = completion( model="gpt-4o", messages=[{"role":"user","content":"北京今天天气"}], tools=tools, stream=True, ) chunks = [] for chunk in resp: chunks.append(chunk) # UI 层只显示 content, 不管 tool_calls delta = chunk.choices[0].delta.content or "" print(delta, end="", flush=True) # 流结束后, 用 stream_chunk_builder 安全组装 full = stream_chunk_builder(chunks) tcs = full.choices[0].message.tool_calls if tcs: for tc in tcs: print(f"调用 {tc.function.name}(", tc.function.arguments, ")") # arguments 此时是完整的 JSON 字符串, 可以 json.loads
流式工具调用的最佳实践:流式显示 content 给用户,但 tool_calls 只在 stream_chunk_builder 合并后读——不要在 chunk 里自己拼 JSON。
性能实测:批处理 100 个分类请求
把异步 + 限流 + 重试 整合起来,做个小 benchmark:
import asyncio, time from litellm import acompletion SEM = asyncio.Semaphore(20) async def classify(text): async with SEM: r = await acompletion( model="gpt-4o-mini", messages=[ {"role":"system","content":"仅输出 bug/feature/billing/other 之一"}, {"role":"user","content":text}, ], temperature=0, max_tokens=5, num_retries=3, timeout=15, ) return r.choices[0].message.content.strip() async def main(): tickets = [f"user ticket #{i} : my app crashed" for i in range(100)] t = time.time() res = await asyncio.gather(*[classify(t) for t in tickets]) print(f"100 tickets, {time.time()-t:.2f}s") print(set(res)) asyncio.run(main())
实测在 Tier 1 账号 + 信号量 20 的情况下,100 个请求大约 10-15 秒完成。同步版本需要 50-80 秒。这就是异步的力量。
在 FastAPI 里的标准模板
生产里经常要把 LiteLLM 的流输出透传给前端。FastAPI 的 StreamingResponse + SSE 是标准搭配:
from fastapi import FastAPI from fastapi.responses import StreamingResponse from litellm import acompletion import json app = FastAPI() async def sse_stream(body): resp = await acompletion( model=body["model"], messages=body["messages"], stream=True, max_tokens=1024, ) async for chunk in resp: delta = chunk.choices[0].delta.content or "" if delta: yield f"data: {json.dumps({'delta': delta})}\n\n" yield "data: [DONE]\n\n" @app.post("/chat") async def chat(body: dict): return StreamingResponse(sse_stream(body), media_type="text/event-stream")
前端用 EventSource 或 fetch() + ReadableStream 接收。如果你走 WebSocket,也是一样的思路——把 yield 改成 ws.send_json。
mock_response 也能流式
写测试的时候别忘了:
resp = completion( model="gpt-4o-mini", messages=msgs, mock_response="春眠不觉晓, 处处闻啼鸟。", stream=True, ) for chunk in resp: print(chunk.choices[0].delta.content or "", end="", flush=True)
LiteLLM 会把 mock 字符串切成一串 chunk 模拟真实 SSE 流。测 UI 非常爽。
常见坑汇总
- for chunk 忘了判空:
chunk.choices[0].delta.content可能为None,直接print会输出 "None" 字符串。永远or ""。 - 在 FastAPI 的 sync endpoint 里用 completion:事件循环被阻塞,整个服务会卡。改成
async def+acompletion。 - 流式时做
json.loads(chunk...):chunk 里 tool_calls 的 arguments 是部分字符串,解析必然失败。用stream_chunk_builder。 - usage 是 None:流式默认不返回,打开
stream_options={"include_usage": True}。 - timeout 太短:复杂推理(o1/R1)可能 60 秒才回,默认 timeout 可能不够。显式设 90-120。
- Semaphore 忘记用 context manager:
async with SEM:必须包裹,不然异常发生时 permit 漏还,很快死锁。 - 前端看不到实时输出:检查是否有 nginx/网关开了 buffer。SSE 要加
X-Accel-Buffering: no头。
本章小结
stream=True返回生成器,for chunk in ...迭代,每个 chunk 有delta.content- 要拿完整 usage 必须开
stream_options={"include_usage": True} acompletion+await+async for是异步 + 流式的组合asyncio.Semaphore控并发,num_retries做自动重试- 流式工具调用必须用
stream_chunk_builder合并,不要自己拼 JSON - FastAPI +
StreamingResponse+ SSE 是前后端一条龙的标准套路