Chapter 04

流式与异步

"流式"让你第一个 token 出来就能显示,用户体感快十倍。"异步"让你一个进程同时服务成百上千个请求。LiteLLM 把这两件事都抽象得像 OpenAI。

为什么要流式

一次 LLM 请求通常要 2-30 秒。如果你等全部生成完再返回,用户看到的就是"转圈 10 秒,然后啪一下一大段文字"。换成流式:

~500ms
首 token 到达时间 TTFT
30-100
tokens/s 稳态速率
10x
感知速度提升

用户在 500ms 之后就看到文字一点点吐出来,这是个巨大的体验升级。所有面向终端用户的 chat UI 都是流式。

stream=True:最简单的流式

from litellm import completion

resp = completion(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "写一首四句五言诗"}],
    stream=True,
)

for chunk in resp:
    delta = chunk.choices[0].delta.content or ""
    print(delta, end="", flush=True)
print()

这里发生了什么:

  1. completion(stream=True) 立即返回一个生成器,不是普通 response。
  2. 每次 for chunk in 拿到一个 ModelResponseStream 对象——结构和非流式的 ModelResponse 几乎一样,但 message 改成了 delta
  3. delta.content 是本次增量的文本。可能是 None(比如第一块只有 role、最后一块 finish_reason),所以要用 or "" 兜底。
  4. 生成器迭代结束 = 整条流结束。

chunk 长什么样

打印第一个 chunk,你大概会看到:

ModelResponseStream(
  id='chatcmpl-abc',
  choices=[StreamingChoices(
    index=0,
    delta=Delta(content='', role='assistant', tool_calls=None),
    finish_reason=None
  )],
  created=1730000000,
  model='gpt-4o-mini',
)

典型的一条完整流可能长这样(简化):

chunk #0   delta={role:"assistant", content:""}       finish_reason=None
chunk #1   delta={content:"春"}                        finish_reason=None
chunk #2   delta={content:"眠"}                        finish_reason=None
chunk #3   delta={content:"不觉晓, "}                  finish_reason=None
chunk #4   delta={content:"处处闻啼鸟。"}                finish_reason=None
...
chunk #N   delta={content:""}                          finish_reason="stop"

每一家的 chunk 粒度不一样——OpenAI 通常一 chunk 1-3 token,Anthropic 的 chunk 更大;但 LiteLLM 已经把它们都归一成了上面这个 OpenAI 风格结构。你写一次 for 循环,到哪家都对

重组完整文本

有时你一边流式吐给用户,一边还要保存完整答案入库。两种做法:

方法一:自己拼接

full = ""
for chunk in resp:
    delta = chunk.choices[0].delta.content or ""
    full += delta
    print(delta, end="", flush=True)

save_to_db(full)

方法二:stream_chunk_builder

LiteLLM 提供一个工具函数,把所有 chunks 合并成一个完整的 ModelResponse(带上 usage):

from litellm import stream_chunk_builder

chunks = []
for chunk in resp:
    chunks.append(chunk)
    delta = chunk.choices[0].delta.content or ""
    print(delta, end="", flush=True)

full = stream_chunk_builder(chunks)
print(full.choices[0].message.content)    # 完整文本
print(full.usage.completion_tokens)         # 完整用量

优势:工具调用、finish_reason 等非文本字段也能被正确合并。流式场景里要拿到完整 usage,这是最可靠的办法

流式里的 usage:不是默认给你的

一个常见坑:流式场景里,OpenAI 默认不给你 token 数。只有最后一个 chunk 是 finish_reason=stop,没有 usage 字段。要拿到 usage,得显式开关:

resp = completion(
    model="gpt-4o-mini",
    messages=msgs,
    stream=True,
    stream_options={"include_usage": True},   # 关键
)

for chunk in resp:
    if chunk.usage:
        print("用量:", chunk.usage)   # 只有最后一个 chunk 有 usage

LiteLLM 有个更省心的模式——litellm.include_usage = True 全局打开,所有流式调用都带用量。推荐生产环境打开,否则你连本次调用花了多少钱都算不出来。

finish_reason:流为什么停了

流结束时最后一个 chunk 会带 finish_reason。可能的值:

stop
模型自然结束,说完了。
length
达到 max_tokens 被截断。生产代码要检查这个状态——内容没说完。
tool_calls
模型要求调用工具。需要你执行函数再把结果回填给模型(第 5 章)。
content_filter
被 Azure/OpenAI 的审核拦了。常见于 user 输入了敏感词。
full, finish = "", None
for chunk in resp:
    c = chunk.choices[0]
    full += c.delta.content or ""
    if c.finish_reason:
        finish = c.finish_reason

if finish == "length":
    logger.warning("模型输出被截断, 考虑提高 max_tokens")

异步:acompletion 的正确姿势

同步的 completion() 一次只能等一个响应——在 FastAPI 等异步 server 里用它会直接阻塞事件循环。必须换成 acompletion:

import asyncio
from litellm import acompletion

async def ask(model, q):
    resp = await acompletion(
        model=model,
        messages=[{"role":"user","content":q}],
        max_tokens=256,
    )
    return resp.choices[0].message.content

async def main():
    # 5 个请求并发
    tasks = [ask("gpt-4o-mini", f"第{i}题: 写一句吉祥话") for i in range(5)]
    results = await asyncio.gather(*tasks)
    for r in results:
        print(r)

asyncio.run(main())

5 个请求几乎是同时发出去的——总耗时约等于最慢那一个。同步版本要 5 倍时间。

并发 + 流式:async for

async def ask_stream(model, q):
    resp = await acompletion(
        model=model,
        messages=[{"role":"user","content":q}],
        stream=True,
        max_tokens=256,
    )
    async for chunk in resp:
        delta = chunk.choices[0].delta.content or ""
        print(delta, end="", flush=True)
    print()

两个关键区别:

限流与信号量

OpenAI 的 Tier 1 账号通常限 500 RPM。你一口气 1000 个 gather,立刻 429。用 asyncio.Semaphore 控制并发:

import asyncio
from litellm import acompletion

SEM = asyncio.Semaphore(20)   # 最多 20 个 in-flight

async def ask(q):
    async with SEM:
        resp = await acompletion(
            model="gpt-4o-mini",
            messages=[{"role":"user","content":q}],
            max_tokens=256,
            timeout=30,
        )
        return resp.choices[0].message.content

async def main():
    questions = [f"介绍中国第{i}位皇帝" for i in range(200)]
    results = await asyncio.gather(*[ask(q) for q in questions])
    print(len(results))

asyncio.run(main())

这是个非常常见的批处理模式。第 7 章的 Router 会把限流和重试做得更优雅——这里你先手搓理解一下原理。

内置重试:num_retries

网络抖一下、上游 5xx、偶发超时,都不该让用户看到失败。LiteLLM 自带重试:

resp = completion(
    model="gpt-4o-mini",
    messages=msgs,
    num_retries=3,            # 失败最多重试 3 次
    timeout=30,
)

默认只对可重试的错误重试(5xx、429、网络断),4xx 业务错误不会重试。退避策略是指数退避(1s, 2s, 4s ...)。

如果你需要对所有错误都重试(不推荐但偶尔有用):

resp = completion(
    model="gpt-4o-mini",
    messages=msgs,
    num_retries=3,
    retry_policy={"RateLimitErrorRetries": 5,
                  "InternalServerErrorRetries": 3,
                  "BadRequestErrorRetries": 0},   # 4xx 别重
)

httpx 连接复用

一个隐藏性能点:默认情况下,LiteLLM 每次请求会新建 HTTP 连接(特别是老版本)。短间隔大量调用时,握手成本明显。办法是传入自己管理的 httpx client:

import httpx
from litellm import acompletion

# 全局复用一个 async client, 跟着 app 生命周期
client = httpx.AsyncClient(
    limits=httpx.Limits(max_connections=100, max_keepalive_connections=50),
    timeout=httpx.Timeout(60),
)

async def ask(q):
    resp = await acompletion(
        model="gpt-4o-mini",
        messages=[{"role":"user","content":q}],
        client=client,           # 复用
    )
    return resp.choices[0].message.content

不是每家 provider 都完全支持这个优化——OpenAI 走的是官方 sdk 的 httpx,Anthropic 是官方 sdk,Gemini 是它家的 sdk——LiteLLM 在内部管理连接池,一般不用手动干预。但在超高吞吐场景(>1k QPS),手动给 client 是必要的。

流式工具调用:魔鬼在细节

最容易踩坑的一块:模型在流式里决定调工具,参数是一个字符一个字符流出来的 JSON 字符串。你不能简单拼一下就 json.loads,中间任何一个 chunk 都是不完整的 JSON。

from litellm import completion, stream_chunk_builder

tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "返回某城市天气",
        "parameters": {
            "type": "object",
            "properties": {"city": {"type": "string"}},
            "required": ["city"],
        },
    },
}]

resp = completion(
    model="gpt-4o",
    messages=[{"role":"user","content":"北京今天天气"}],
    tools=tools,
    stream=True,
)

chunks = []
for chunk in resp:
    chunks.append(chunk)
    # UI 层只显示 content, 不管 tool_calls
    delta = chunk.choices[0].delta.content or ""
    print(delta, end="", flush=True)

# 流结束后, 用 stream_chunk_builder 安全组装
full = stream_chunk_builder(chunks)
tcs = full.choices[0].message.tool_calls
if tcs:
    for tc in tcs:
        print(f"调用 {tc.function.name}(", tc.function.arguments, ")")
        # arguments 此时是完整的 JSON 字符串, 可以 json.loads

流式工具调用的最佳实践:流式显示 content 给用户,但 tool_calls 只在 stream_chunk_builder 合并后读——不要在 chunk 里自己拼 JSON。

性能实测:批处理 100 个分类请求

把异步 + 限流 + 重试 整合起来,做个小 benchmark:

import asyncio, time
from litellm import acompletion

SEM = asyncio.Semaphore(20)

async def classify(text):
    async with SEM:
        r = await acompletion(
            model="gpt-4o-mini",
            messages=[
                {"role":"system","content":"仅输出 bug/feature/billing/other 之一"},
                {"role":"user","content":text},
            ],
            temperature=0,
            max_tokens=5,
            num_retries=3,
            timeout=15,
        )
        return r.choices[0].message.content.strip()

async def main():
    tickets = [f"user ticket #{i} : my app crashed" for i in range(100)]
    t = time.time()
    res = await asyncio.gather(*[classify(t) for t in tickets])
    print(f"100 tickets, {time.time()-t:.2f}s")
    print(set(res))

asyncio.run(main())

实测在 Tier 1 账号 + 信号量 20 的情况下,100 个请求大约 10-15 秒完成。同步版本需要 50-80 秒。这就是异步的力量

在 FastAPI 里的标准模板

生产里经常要把 LiteLLM 的流输出透传给前端。FastAPI 的 StreamingResponse + SSE 是标准搭配:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from litellm import acompletion
import json

app = FastAPI()

async def sse_stream(body):
    resp = await acompletion(
        model=body["model"],
        messages=body["messages"],
        stream=True,
        max_tokens=1024,
    )
    async for chunk in resp:
        delta = chunk.choices[0].delta.content or ""
        if delta:
            yield f"data: {json.dumps({'delta': delta})}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/chat")
async def chat(body: dict):
    return StreamingResponse(sse_stream(body), media_type="text/event-stream")

前端用 EventSourcefetch() + ReadableStream 接收。如果你走 WebSocket,也是一样的思路——把 yield 改成 ws.send_json

mock_response 也能流式

写测试的时候别忘了:

resp = completion(
    model="gpt-4o-mini",
    messages=msgs,
    mock_response="春眠不觉晓, 处处闻啼鸟。",
    stream=True,
)
for chunk in resp:
    print(chunk.choices[0].delta.content or "", end="", flush=True)

LiteLLM 会把 mock 字符串切成一串 chunk 模拟真实 SSE 流。测 UI 非常爽。

常见坑汇总

  1. for chunk 忘了判空:chunk.choices[0].delta.content 可能为 None,直接 print 会输出 "None" 字符串。永远 or ""
  2. 在 FastAPI 的 sync endpoint 里用 completion:事件循环被阻塞,整个服务会卡。改成 async def + acompletion
  3. 流式时做 json.loads(chunk...):chunk 里 tool_calls 的 arguments 是部分字符串,解析必然失败。用 stream_chunk_builder
  4. usage 是 None:流式默认不返回,打开 stream_options={"include_usage": True}
  5. timeout 太短:复杂推理(o1/R1)可能 60 秒才回,默认 timeout 可能不够。显式设 90-120。
  6. Semaphore 忘记用 context manager:async with SEM: 必须包裹,不然异常发生时 permit 漏还,很快死锁。
  7. 前端看不到实时输出:检查是否有 nginx/网关开了 buffer。SSE 要加 X-Accel-Buffering: no 头。

本章小结