什么时候用 Workflow 而不是 Agent
| 场景 | Agent | Workflow |
|---|---|---|
| 用户聊天 | ✅ | — |
| 结构固定的多步流程 | ⚠️ | ✅ |
| 需要人工审核再继续 | — | ✅(suspend/resume) |
| 大批量并行处理 | — | ✅(foreach / parallel) |
| 流程要可视化给产品经理看 | — | ✅(DAG 图) |
| 任意推理 + 工具使用 | ✅ | ⚠️ |
最小 Workflow
import { createWorkflow, createStep } from '@mastra/core/workflows'; import { z } from 'zod'; const parseStep = createStep({ id: 'parse', inputSchema: z.object({ url: z.string().url() }), outputSchema: z.object({ text: z.string() }), execute: async ({ inputData }) => { const res = await fetch(inputData.url); return { text: await res.text() }; }, }); const summarizeStep = createStep({ id: 'summarize', inputSchema: z.object({ text: z.string() }), outputSchema: z.object({ summary: z.string() }), execute: async ({ inputData, mastra }) => { const agent = mastra.getAgent('summarizer'); const { text } = await agent.generate(inputData.text); return { summary: text }; }, }); export const pageSummary = createWorkflow({ id: 'page-summary', inputSchema: z.object({ url: z.string().url() }), outputSchema: z.object({ summary: z.string() }), }) .then(parseStep) .then(summarizeStep) .commit();
.commit() 把 Workflow 冻结成不可变定义,再注册到 Mastra:
export const mastra = new Mastra({ agents: { summarizer }, workflows: { pageSummary }, });
七种控制流原语
.then(step)
串行链:上一步输出自动作为下一步输入(类型强制对齐)。
.parallel([s1, s2, s3])
并行:三个 step 拿同一输入,返回
{ s1, s2, s3 }。.branch([[cond1, s1], [cond2, s2]])
条件分支:按顺序评估,第一个为真的 step 执行。
.foreach(step)
对数组的每个元素跑一次 step,收集结果(可配置并发度)。
.dountil(step, cond)
循环:执行 step,判断 cond,为假则继续,适合"反复润色直到满意"。
.dowhile(step, cond)
与 dountil 相反,条件为真才继续。
.map((prev) => next)
纯函数转换,用来对齐上下游 step 的 schema,不调模型也不走 IO。
并行 + 合并示例
const research = createWorkflow({ id: 'research', ... }) .then(planStep) .parallel([searchWebStep, searchDocsStep, searchCodeStep]) .map(({ searchWebStep, searchDocsStep, searchCodeStep }) => ({ // 合并三路结果为统一上下文 evidence: [ ...searchWebStep.hits, ...searchDocsStep.hits, ...searchCodeStep.hits, ], })) .then(writeReportStep) .commit();
三个 search 步骤同时跑,合起来作为上下文给 writeReport。在 Playground 的 Workflow 面板里会看到三个节点横向排开,状态绿色同步变化。
条件分支
workflow .then(classifyStep) // { category: 'bug' | 'feature' | 'question' } .branch([ [(ctx) => ctx.category === 'bug', handleBugStep], [(ctx) => ctx.category === 'feature', handleFeatureStep], [() => true, answerQuestionStep], // 默认分支 ]) .commit();
循环:反复打磨直到满意
const reviseStep = createStep({ id: 'revise', inputSchema: z.object({ draft: z.string(), feedback: z.string().optional() }), outputSchema: z.object({ draft: z.string(), score: z.number() }), execute: async ({ inputData, mastra }) => { const writer = mastra.getAgent('writer'); const judge = mastra.getAgent('judge'); const { text } = await writer.generate( `根据反馈修改稿件。反馈:${inputData.feedback ?? '(首次)'} 原稿:${inputData.draft}` ); const { object } = await judge.generate(text, { output: z.object({ score: z.number().min(0).max(10), feedback: z.string() }), }); return { draft: text, score: object.score }; }, }); workflow.dountil(reviseStep, (ctx) => ctx.score >= 8);
foreach:批量处理
const processOne = createStep({ id: 'process-one', inputSchema: z.object({ id: z.string() }), outputSchema: z.object({ id: z.string(), result: z.string() }), execute: async ({ inputData }) => { ... }, }); workflow .then(fetchListStep) // 返回 { items: [{id: ...}, ...] } .foreach(processOne, { concurrency: 5 }) // 最多 5 个并发 .commit();
暂停与恢复(Human-in-the-loop)
const approvalStep = createStep({ id: 'approval', inputSchema: z.object({ proposal: z.string() }), resumeSchema: z.object({ approved: z.boolean() }), outputSchema: z.object({ approved: z.boolean() }), execute: async ({ inputData, suspend, resumeData }) => { if (!resumeData) { // 第一次执行到这里:发通知给审核员,暂停 await notifyReviewer(inputData.proposal); await suspend({ pendingProposal: inputData.proposal }); return { approved: false }; // unreachable } // 审核员点了按钮后用 resumeData 恢复 return { approved: resumeData.approved }; }, });
// 外部触发 resume const run = await mastra.getWorkflow('approval-flow').resume({ runId: 'xxxx', stepId: 'approval', resumeData: { approved: true }, });
运行 Workflow
const wf = mastra.getWorkflow('pageSummary'); // 同步等完 const result = await wf.start({ inputData: { url: 'https://...' } }); console.log(result.result); // 监听每步变化 const run = await wf.stream({ inputData: { url: '...' } }); for await (const evt of run) { if (evt.type === 'step-finish') { console.log(evt.stepId, evt.output); } }
Workflow 与 Agent 嵌套
- Workflow 调 Agent:step 里
mastra.getAgent(),Agent 提供智能决策。 - Agent 调 Workflow:把 Workflow 包装成 Tool,Agent 自行决定什么时候启动流程。
import { createTool } from '@mastra/core/tools'; export const runResearchTool = createTool({ id: 'run-research', description: '启动深度研究工作流', inputSchema: z.object({ topic: z.string() }), execute: async ({ context, mastra }) => { const wf = mastra.getWorkflow('researchFlow'); const { result } = await wf.start({ inputData: { topic: context.topic } }); return result; }, });
本章小结
- Workflow = 显式 DAG,step 的 input/output 由 Zod 校验
- 七种控制流:then、parallel、branch、foreach、dountil、dowhile、map
- suspend / resume 支持人工审核、等待外部事件
- Agent 和 Workflow 可互相调用:Agent 决策 + Workflow 固化流程
- Playground 的 Workflow DAG 视图把运行状态实时画给你看