一、Workflow = Event → Step → Event
核心模型非常简单:
- Event:继承
Event的 Pydantic 类,是 step 间传递的消息 - Step:
@step装饰的 async 方法,接收一个 Event 类型,return另一个 Event - StartEvent / StopEvent:入口和出口
from llama_index.core.workflow import Workflow, step, Event, StartEvent, StopEvent
class RetrievedEvent(Event):
nodes: list
class MyRAG(Workflow):
@step
async def retrieve(self, ev: StartEvent) -> RetrievedEvent:
nodes = retriever.retrieve(ev.query)
return RetrievedEvent(nodes=nodes)
@step
async def synthesize(self, ev: RetrievedEvent) -> StopEvent:
ans = await synth.asynthesize(ev.nodes)
return StopEvent(result=ans)
wf = MyRAG()
result = await wf.run(query="...是什么?")
读成句子就是:start 事件 → retrieve 返回 RetrievedEvent → synthesize 吃 RetrievedEvent → 返回 stop。LlamaIndex 运行时根据 event 类型自动路由,你不用手写 DAG。
二、分支:根据条件选路径
class RelevantEvent(Event):
nodes: list
class RewriteEvent(Event):
new_query: str
class SelfRAG(Workflow):
@step
async def retrieve(self, ev: StartEvent) -> RelevantEvent | RewriteEvent:
nodes = retriever.retrieve(ev.query)
score = await evaluator.aevaluate(ev.query, nodes)
if score > 0.6:
return RelevantEvent(nodes=nodes)
else:
new = await llm.acomplete(f"改写这个查询:{ev.query}")
return RewriteEvent(new_query=new.text)
@step
async def retry(self, ev: RewriteEvent) -> RelevantEvent:
nodes = retriever.retrieve(ev.new_query)
return RelevantEvent(nodes=nodes)
@step
async def answer(self, ev: RelevantEvent) -> StopEvent:
return StopEvent(result=await synth.asynthesize(ev.nodes))
一个 step 的 return type 是 A | B 就代表两种分支——运行时根据实际返回的 Event 类型路由。
三、并行与合流
from llama_index.core.workflow import Context
class QueryPart(Event):
sub_query: str
class SubAnswer(Event):
answer: str
class Parallel(Workflow):
@step
async def split(self, ctx: Context, ev: StartEvent) -> QueryPart:
sub_qs = [f"{ev.query} 第{i}部分" for i in range(3)]
# send_event 可以发多个同类 event,触发并行
for q in sub_qs:
ctx.send_event(QueryPart(sub_query=q))
@step(num_workers=3) # 并发度
async def answer_sub(self, ev: QueryPart) -> SubAnswer:
ans = await qe.aquery(ev.sub_query)
return SubAnswer(answer=str(ans))
@step
async def merge(self, ctx: Context, ev: SubAnswer) -> StopEvent | None:
# collect_events 会等到收到 N 个同类 event 才继续
results = ctx.collect_events(ev, [SubAnswer] * 3)
if results is None:
return None # 还没收齐,挂起
final = "\n".join(r.answer for r in results)
return StopEvent(result=final)
关键 API 两个:
ctx.send_event(ev)— 发任意多个 event,触发扇出ctx.collect_events(ev, [E1, E2, ...])— 等齐一组 event 后合流,未齐返回 None
四、Context:跨 step 的共享状态
class MyWF(Workflow):
@step
async def init(self, ctx: Context, ev: StartEvent) -> SomeEvent:
async with ctx.store.edit_state() as state:
state["query"] = ev.query
state["attempts"] = 0
return SomeEvent()
@step
async def check(self, ctx: Context, ev: SomeEvent) -> StopEvent:
state = await ctx.store.get_state()
query = state["query"]
return StopEvent(result=query)
Context 不单是 key-value store——它还管事件分发、并行 workers、waiter 队列,是 Workflow 的心脏。
五、HITL:等待人工输入
from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent
class RefundWF(Workflow):
@step
async def review(self, ctx: Context, ev: StartEvent) -> StopEvent:
human = await ctx.wait_for_event(
HumanResponseEvent,
waiter_event=InputRequiredEvent(
prefix=f"订单 {ev.order_id},退款 {ev.amount},yes/no?"
),
)
if human.response == "yes":
refund_api.call(ev.order_id, ev.amount)
return StopEvent(result="done")
return StopEvent(result="cancelled")
# 外部注入 HumanResponse:
handler = wf.run(order_id="o-1", amount=99.9)
async for ev in handler.stream_events():
if isinstance(ev, InputRequiredEvent):
answer = input(ev.prefix) # 或从 UI/队列读
handler.ctx.send_event(HumanResponseEvent(response=answer))
HITL 在金融/法律/HR 这类不能全自动的流程里不可或缺——Workflow 原生支持,比起自己写 threading/queue 不知高到哪里去。
六、Checkpointing:崩了能续跑
from llama_index.core.workflow.checkpointer import WorkflowCheckpointer
cp = WorkflowCheckpointer(workflow=wf)
handler = cp.run(query="...")
result = await handler
print(cp.checkpoints[handler.run_id]) # 每个 step 后的完整 context
# 从某个 checkpoint 续跑(比如崩溃后恢复)
handler2 = cp.run_from(checkpoint=cp.checkpoints[handler.run_id][-1])
七、可视化
from llama_index.utils.workflow import draw_all_possible_flows, draw_most_recent_execution
# 静态:所有可能路径
draw_all_possible_flows(MyRAG, filename="rag.html")
# 动态:这次运行实际走的路径
draw_most_recent_execution(wf, filename="last_run.html")
生成交互式 HTML,复杂 workflow 调试神器——看着图改比看代码改快十倍。
八、完整例子:Corrective RAG with Workflow
from llama_index.core.workflow import Workflow, step, Event, StartEvent, StopEvent, Context
class RetrievedEvent(Event):
query: str
nodes: list
relevance: float
class WebFallbackEvent(Event):
query: str
class FinalEvent(Event):
query: str
nodes: list
class CRAG(Workflow):
@step
async def retrieve(self, ev: StartEvent) -> RetrievedEvent:
nodes = retriever.retrieve(ev.query)
score = await evaluator.aevaluate(ev.query, nodes)
return RetrievedEvent(query=ev.query, nodes=nodes, relevance=score)
@step
async def decide(self, ev: RetrievedEvent) -> FinalEvent | WebFallbackEvent:
if ev.relevance >= 0.7:
return FinalEvent(query=ev.query, nodes=ev.nodes)
return WebFallbackEvent(query=ev.query)
@step
async def web_search(self, ev: WebFallbackEvent) -> FinalEvent:
web_nodes = await tavily.search(ev.query)
return FinalEvent(query=ev.query, nodes=web_nodes)
@step
async def synthesize(self, ev: FinalEvent) -> StopEvent:
resp = await synth.asynthesize(query=ev.query, nodes=ev.nodes)
return StopEvent(result=resp)
wf = CRAG(timeout=60, verbose=True)
result = await wf.run(query="最近的 Model Context Protocol 进展")
九、流式事件:给前端实时反馈
handler = wf.run(query="...")
async for ev in handler.stream_events():
if isinstance(ev, RetrievedEvent):
yield f"已召回 {len(ev.nodes)} 条"
elif isinstance(ev, WebFallbackEvent):
yield "内部知识不足,搜一下网..."
result = await handler
十、嵌套 Workflow
一个 workflow 可以当成 step 嵌到另一个 workflow。适合把通用子流程(比如"检索 + rerank")封装复用:
class RetrievalWF(Workflow): ... # 子流程
class BigWF(Workflow):
@step
async def call_retrieval(self, ev: StartEvent) -> SomeEvent:
sub = RetrievalWF()
r = await sub.run(query=ev.query)
return SomeEvent(nodes=r)
十一、Workflow vs Agent vs QueryEngine
| 形态 | 控制流由谁决定 | 适合 |
|---|---|---|
| QueryEngine | 硬编码(Retriever→Postproc→Synth) | 固定 RAG 流程 |
| Agent | LLM 每轮决定 | 开放任务、工具组合 |
| Workflow | 代码写死 + event 路由 | 步骤清晰、要可控可视化的复杂流程 |
选型建议:
① 简单 RAG → QueryEngine
② 用户意图开放、工具场景 → Agent
③ 业务流程固定但有分支/并行/HITL → Workflow 🔥
④ 生产级 Self-RAG/CRAG → Workflow 里封装 Agent step
① 简单 RAG → QueryEngine
② 用户意图开放、工具场景 → Agent
③ 业务流程固定但有分支/并行/HITL → Workflow 🔥
④ 生产级 Self-RAG/CRAG → Workflow 里封装 Agent step
十二、Workflow vs LangGraph
同是事件驱动 DAG,两者对比:
| 维度 | LlamaIndex Workflow | LangGraph |
|---|---|---|
| 抽象 | Event + @step(Python 类型驱动) | Node + Edge(TypedDict state) |
| 路由 | return 的 event 类型决定 | 条件函数 or 固定 edge |
| 生态 | RAG 深度整合 | Agent/LLM orchestration 深 |
| HITL | wait_for_event 原生 | interrupt 原生 |
| checkpoint | WorkflowCheckpointer | SQLite/Postgres backend |
| 可视化 | HTML 渲染 | Graph studio 可视化界面 |
两者没有绝对高下——已经用 LlamaIndex 做 RAG 就顺手用 Workflow,已经用 LangChain 就 LangGraph。
十三、反模式
- step 里写同步 I/O:阻塞事件循环,并发全失效。永远 async。
- Event 类塞大对象:Event 本质是消息,含几 MB 的 nodes 会在内存里被复制多次。存 Context,事件传引用。
- collect_events 数量写错:永远收不齐,workflow 挂起直到 timeout。
- 不设 timeout:LLM 慢一次就把整个流程拖垮——每个 workflow 都要设 timeout。
- 把 Workflow 当状态机用但不画图:两三个分支起复杂后自己都会晕,画一次
draw_all_possible_flows省时间。 - 嵌套 workflow 层级过深:调试时事件流跨层,定位难。两层以内比较合理。
- 忘了 checkpoint 恢复场景:长流程(分钟级)崩溃重试成本高,生产一定加 checkpointer。
- 多个 step 共用一个 Event 类型:类型驱动路由会混淆——每个 step 一个专用 Event 类型更清晰。
十四、本章小结
记住:
① Workflow 是"可控的编排工具"——有明确步骤、分支、并行、HITL 时用它,不用自己写 queue + state machine。
② Event 驱动 + return 类型路由,比 LangGraph 的 add_edge 更 Pythonic——改流程只改类型签名。
③
④ 生产必配:timeout + checkpointer + 可视化。Self-RAG、CRAG、多阶段审批流——都是 Workflow 的典型场景。
① Workflow 是"可控的编排工具"——有明确步骤、分支、并行、HITL 时用它,不用自己写 queue + state machine。
② Event 驱动 + return 类型路由,比 LangGraph 的 add_edge 更 Pythonic——改流程只改类型签名。
③
ctx.send_event / collect_events 处理并行扇出合流,wait_for_event 处理 HITL。
④ 生产必配:timeout + checkpointer + 可视化。Self-RAG、CRAG、多阶段审批流——都是 Workflow 的典型场景。