Chapter 10

Workflow · 事件驱动的复杂流程

QueryEngine 处理"一问一答",Agent 处理"LLM 自主循环",Workflow 处理"有明确步骤但步骤间会分支/并行/等待"的流程。LlamaIndex 的 Workflow 是 LangGraph 的对位物——事件驱动、@step 装饰器、原生支持并行和 HITL。

一、Workflow = Event → Step → Event

核心模型非常简单:

from llama_index.core.workflow import Workflow, step, Event, StartEvent, StopEvent

class RetrievedEvent(Event):
    nodes: list

class MyRAG(Workflow):
    @step
    async def retrieve(self, ev: StartEvent) -> RetrievedEvent:
        nodes = retriever.retrieve(ev.query)
        return RetrievedEvent(nodes=nodes)

    @step
    async def synthesize(self, ev: RetrievedEvent) -> StopEvent:
        ans = await synth.asynthesize(ev.nodes)
        return StopEvent(result=ans)

wf = MyRAG()
result = await wf.run(query="...是什么?")

读成句子就是:start 事件 → retrieve 返回 RetrievedEvent → synthesize 吃 RetrievedEvent → 返回 stop。LlamaIndex 运行时根据 event 类型自动路由,你不用手写 DAG。

二、分支:根据条件选路径

class RelevantEvent(Event):
    nodes: list

class RewriteEvent(Event):
    new_query: str

class SelfRAG(Workflow):
    @step
    async def retrieve(self, ev: StartEvent) -> RelevantEvent | RewriteEvent:
        nodes = retriever.retrieve(ev.query)
        score = await evaluator.aevaluate(ev.query, nodes)
        if score > 0.6:
            return RelevantEvent(nodes=nodes)
        else:
            new = await llm.acomplete(f"改写这个查询:{ev.query}")
            return RewriteEvent(new_query=new.text)

    @step
    async def retry(self, ev: RewriteEvent) -> RelevantEvent:
        nodes = retriever.retrieve(ev.new_query)
        return RelevantEvent(nodes=nodes)

    @step
    async def answer(self, ev: RelevantEvent) -> StopEvent:
        return StopEvent(result=await synth.asynthesize(ev.nodes))

一个 step 的 return type 是 A | B 就代表两种分支——运行时根据实际返回的 Event 类型路由。

三、并行与合流

from llama_index.core.workflow import Context

class QueryPart(Event):
    sub_query: str

class SubAnswer(Event):
    answer: str

class Parallel(Workflow):
    @step
    async def split(self, ctx: Context, ev: StartEvent) -> QueryPart:
        sub_qs = [f"{ev.query}{i}部分" for i in range(3)]
        # send_event 可以发多个同类 event,触发并行
        for q in sub_qs:
            ctx.send_event(QueryPart(sub_query=q))

    @step(num_workers=3)        # 并发度
    async def answer_sub(self, ev: QueryPart) -> SubAnswer:
        ans = await qe.aquery(ev.sub_query)
        return SubAnswer(answer=str(ans))

    @step
    async def merge(self, ctx: Context, ev: SubAnswer) -> StopEvent | None:
        # collect_events 会等到收到 N 个同类 event 才继续
        results = ctx.collect_events(ev, [SubAnswer] * 3)
        if results is None:
            return None     # 还没收齐,挂起
        final = "\n".join(r.answer for r in results)
        return StopEvent(result=final)

关键 API 两个:

四、Context:跨 step 的共享状态

class MyWF(Workflow):
    @step
    async def init(self, ctx: Context, ev: StartEvent) -> SomeEvent:
        async with ctx.store.edit_state() as state:
            state["query"] = ev.query
            state["attempts"] = 0
        return SomeEvent()

    @step
    async def check(self, ctx: Context, ev: SomeEvent) -> StopEvent:
        state = await ctx.store.get_state()
        query = state["query"]
        return StopEvent(result=query)

Context 不单是 key-value store——它还管事件分发、并行 workers、waiter 队列,是 Workflow 的心脏。

五、HITL:等待人工输入

from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent

class RefundWF(Workflow):
    @step
    async def review(self, ctx: Context, ev: StartEvent) -> StopEvent:
        human = await ctx.wait_for_event(
            HumanResponseEvent,
            waiter_event=InputRequiredEvent(
                prefix=f"订单 {ev.order_id},退款 {ev.amount},yes/no?"
            ),
        )
        if human.response == "yes":
            refund_api.call(ev.order_id, ev.amount)
            return StopEvent(result="done")
        return StopEvent(result="cancelled")

# 外部注入 HumanResponse:
handler = wf.run(order_id="o-1", amount=99.9)
async for ev in handler.stream_events():
    if isinstance(ev, InputRequiredEvent):
        answer = input(ev.prefix)             # 或从 UI/队列读
        handler.ctx.send_event(HumanResponseEvent(response=answer))

HITL 在金融/法律/HR 这类不能全自动的流程里不可或缺——Workflow 原生支持,比起自己写 threading/queue 不知高到哪里去。

六、Checkpointing:崩了能续跑

from llama_index.core.workflow.checkpointer import WorkflowCheckpointer

cp = WorkflowCheckpointer(workflow=wf)
handler = cp.run(query="...")
result = await handler
print(cp.checkpoints[handler.run_id])    # 每个 step 后的完整 context

# 从某个 checkpoint 续跑(比如崩溃后恢复)
handler2 = cp.run_from(checkpoint=cp.checkpoints[handler.run_id][-1])

七、可视化

from llama_index.utils.workflow import draw_all_possible_flows, draw_most_recent_execution

# 静态:所有可能路径
draw_all_possible_flows(MyRAG, filename="rag.html")

# 动态:这次运行实际走的路径
draw_most_recent_execution(wf, filename="last_run.html")

生成交互式 HTML,复杂 workflow 调试神器——看着图改比看代码改快十倍。

八、完整例子:Corrective RAG with Workflow

from llama_index.core.workflow import Workflow, step, Event, StartEvent, StopEvent, Context

class RetrievedEvent(Event):
    query: str
    nodes: list
    relevance: float

class WebFallbackEvent(Event):
    query: str

class FinalEvent(Event):
    query: str
    nodes: list

class CRAG(Workflow):
    @step
    async def retrieve(self, ev: StartEvent) -> RetrievedEvent:
        nodes = retriever.retrieve(ev.query)
        score = await evaluator.aevaluate(ev.query, nodes)
        return RetrievedEvent(query=ev.query, nodes=nodes, relevance=score)

    @step
    async def decide(self, ev: RetrievedEvent) -> FinalEvent | WebFallbackEvent:
        if ev.relevance >= 0.7:
            return FinalEvent(query=ev.query, nodes=ev.nodes)
        return WebFallbackEvent(query=ev.query)

    @step
    async def web_search(self, ev: WebFallbackEvent) -> FinalEvent:
        web_nodes = await tavily.search(ev.query)
        return FinalEvent(query=ev.query, nodes=web_nodes)

    @step
    async def synthesize(self, ev: FinalEvent) -> StopEvent:
        resp = await synth.asynthesize(query=ev.query, nodes=ev.nodes)
        return StopEvent(result=resp)

wf = CRAG(timeout=60, verbose=True)
result = await wf.run(query="最近的 Model Context Protocol 进展")

九、流式事件:给前端实时反馈

handler = wf.run(query="...")
async for ev in handler.stream_events():
    if isinstance(ev, RetrievedEvent):
        yield f"已召回 {len(ev.nodes)} 条"
    elif isinstance(ev, WebFallbackEvent):
        yield "内部知识不足,搜一下网..."

result = await handler

十、嵌套 Workflow

一个 workflow 可以当成 step 嵌到另一个 workflow。适合把通用子流程(比如"检索 + rerank")封装复用:

class RetrievalWF(Workflow): ...     # 子流程

class BigWF(Workflow):
    @step
    async def call_retrieval(self, ev: StartEvent) -> SomeEvent:
        sub = RetrievalWF()
        r = await sub.run(query=ev.query)
        return SomeEvent(nodes=r)

十一、Workflow vs Agent vs QueryEngine

形态控制流由谁决定适合
QueryEngine硬编码(Retriever→Postproc→Synth)固定 RAG 流程
AgentLLM 每轮决定开放任务、工具组合
Workflow代码写死 + event 路由步骤清晰、要可控可视化的复杂流程
选型建议:
① 简单 RAG → QueryEngine
② 用户意图开放、工具场景 → Agent
③ 业务流程固定但有分支/并行/HITL → Workflow 🔥
④ 生产级 Self-RAG/CRAG → Workflow 里封装 Agent step

十二、Workflow vs LangGraph

同是事件驱动 DAG,两者对比:

维度LlamaIndex WorkflowLangGraph
抽象Event + @step(Python 类型驱动)Node + Edge(TypedDict state)
路由return 的 event 类型决定条件函数 or 固定 edge
生态RAG 深度整合Agent/LLM orchestration 深
HITLwait_for_event 原生interrupt 原生
checkpointWorkflowCheckpointerSQLite/Postgres backend
可视化HTML 渲染Graph studio 可视化界面

两者没有绝对高下——已经用 LlamaIndex 做 RAG 就顺手用 Workflow,已经用 LangChain 就 LangGraph

十三、反模式

  1. step 里写同步 I/O:阻塞事件循环,并发全失效。永远 async。
  2. Event 类塞大对象:Event 本质是消息,含几 MB 的 nodes 会在内存里被复制多次。存 Context,事件传引用。
  3. collect_events 数量写错:永远收不齐,workflow 挂起直到 timeout。
  4. 不设 timeout:LLM 慢一次就把整个流程拖垮——每个 workflow 都要设 timeout
  5. 把 Workflow 当状态机用但不画图:两三个分支起复杂后自己都会晕,画一次 draw_all_possible_flows 省时间。
  6. 嵌套 workflow 层级过深:调试时事件流跨层,定位难。两层以内比较合理。
  7. 忘了 checkpoint 恢复场景:长流程(分钟级)崩溃重试成本高,生产一定加 checkpointer。
  8. 多个 step 共用一个 Event 类型:类型驱动路由会混淆——每个 step 一个专用 Event 类型更清晰。

十四、本章小结

记住:
① Workflow 是"可控的编排工具"——有明确步骤、分支、并行、HITL 时用它,不用自己写 queue + state machine。
② Event 驱动 + return 类型路由,比 LangGraph 的 add_edge 更 Pythonic——改流程只改类型签名。
ctx.send_event / collect_events 处理并行扇出合流,wait_for_event 处理 HITL。
④ 生产必配:timeout + checkpointer + 可视化。Self-RAG、CRAG、多阶段审批流——都是 Workflow 的典型场景。