Chapter 05

Workflows:可观测的多步骤编排

Agent 自由度高但不可控,裸 while 循环可控但不可观测。Workflow 是折中——显式声明 step 与控制流,Mastra 自动记录每步输入输出,支持分支、并行、循环、暂停恢复。

什么时候用 Workflow 而不是 Agent

场景AgentWorkflow
用户聊天
结构固定的多步流程⚠️
需要人工审核再继续✅(suspend/resume)
大批量并行处理✅(foreach / parallel)
流程要可视化给产品经理看✅(DAG 图)
任意推理 + 工具使用⚠️

最小 Workflow

import { createWorkflow, createStep } from '@mastra/core/workflows';
import { z } from 'zod';

const parseStep = createStep({
  id: 'parse',
  inputSchema: z.object({ url: z.string().url() }),
  outputSchema: z.object({ text: z.string() }),
  execute: async ({ inputData }) => {
    const res = await fetch(inputData.url);
    return { text: await res.text() };
  },
});

const summarizeStep = createStep({
  id: 'summarize',
  inputSchema: z.object({ text: z.string() }),
  outputSchema: z.object({ summary: z.string() }),
  execute: async ({ inputData, mastra }) => {
    const agent = mastra.getAgent('summarizer');
    const { text } = await agent.generate(inputData.text);
    return { summary: text };
  },
});

export const pageSummary = createWorkflow({
  id: 'page-summary',
  inputSchema: z.object({ url: z.string().url() }),
  outputSchema: z.object({ summary: z.string() }),
})
  .then(parseStep)
  .then(summarizeStep)
  .commit();

.commit() 把 Workflow 冻结成不可变定义,再注册到 Mastra:

export const mastra = new Mastra({
  agents: { summarizer },
  workflows: { pageSummary },
});

七种控制流原语

.then(step)
串行链:上一步输出自动作为下一步输入(类型强制对齐)。
.parallel([s1, s2, s3])
并行:三个 step 拿同一输入,返回 { s1, s2, s3 }
.branch([[cond1, s1], [cond2, s2]])
条件分支:按顺序评估,第一个为真的 step 执行。
.foreach(step)
对数组的每个元素跑一次 step,收集结果(可配置并发度)。
.dountil(step, cond)
循环:执行 step,判断 cond,为假则继续,适合"反复润色直到满意"。
.dowhile(step, cond)
与 dountil 相反,条件为真才继续。
.map((prev) => next)
纯函数转换,用来对齐上下游 step 的 schema,不调模型也不走 IO。

并行 + 合并示例

const research = createWorkflow({ id: 'research', ... })
  .then(planStep)
  .parallel([searchWebStep, searchDocsStep, searchCodeStep])
  .map(({ searchWebStep, searchDocsStep, searchCodeStep }) => ({
    // 合并三路结果为统一上下文
    evidence: [
      ...searchWebStep.hits,
      ...searchDocsStep.hits,
      ...searchCodeStep.hits,
    ],
  }))
  .then(writeReportStep)
  .commit();

三个 search 步骤同时跑,合起来作为上下文给 writeReport。在 Playground 的 Workflow 面板里会看到三个节点横向排开,状态绿色同步变化。

条件分支

workflow
  .then(classifyStep)  // { category: 'bug' | 'feature' | 'question' }
  .branch([
    [(ctx) => ctx.category === 'bug', handleBugStep],
    [(ctx) => ctx.category === 'feature', handleFeatureStep],
    [() => true, answerQuestionStep],  // 默认分支
  ])
  .commit();

循环:反复打磨直到满意

const reviseStep = createStep({
  id: 'revise',
  inputSchema: z.object({ draft: z.string(), feedback: z.string().optional() }),
  outputSchema: z.object({ draft: z.string(), score: z.number() }),
  execute: async ({ inputData, mastra }) => {
    const writer = mastra.getAgent('writer');
    const judge  = mastra.getAgent('judge');
    const { text } = await writer.generate(
      `根据反馈修改稿件。反馈:${inputData.feedback ?? '(首次)'}
       原稿:${inputData.draft}`
    );
    const { object } = await judge.generate(text, {
      output: z.object({ score: z.number().min(0).max(10), feedback: z.string() }),
    });
    return { draft: text, score: object.score };
  },
});

workflow.dountil(reviseStep, (ctx) => ctx.score >= 8);

foreach:批量处理

const processOne = createStep({
  id: 'process-one',
  inputSchema: z.object({ id: z.string() }),
  outputSchema: z.object({ id: z.string(), result: z.string() }),
  execute: async ({ inputData }) => {
    ...
  },
});

workflow
  .then(fetchListStep)             // 返回 { items: [{id: ...}, ...] }
  .foreach(processOne, { concurrency: 5 })  // 最多 5 个并发
  .commit();

暂停与恢复(Human-in-the-loop)

const approvalStep = createStep({
  id: 'approval',
  inputSchema: z.object({ proposal: z.string() }),
  resumeSchema: z.object({ approved: z.boolean() }),
  outputSchema: z.object({ approved: z.boolean() }),
  execute: async ({ inputData, suspend, resumeData }) => {
    if (!resumeData) {
      // 第一次执行到这里:发通知给审核员,暂停
      await notifyReviewer(inputData.proposal);
      await suspend({ pendingProposal: inputData.proposal });
      return { approved: false };  // unreachable
    }
    // 审核员点了按钮后用 resumeData 恢复
    return { approved: resumeData.approved };
  },
});
// 外部触发 resume
const run = await mastra.getWorkflow('approval-flow').resume({
  runId: 'xxxx',
  stepId: 'approval',
  resumeData: { approved: true },
});

运行 Workflow

const wf = mastra.getWorkflow('pageSummary');

// 同步等完
const result = await wf.start({ inputData: { url: 'https://...' } });
console.log(result.result);

// 监听每步变化
const run = await wf.stream({ inputData: { url: '...' } });
for await (const evt of run) {
  if (evt.type === 'step-finish') {
    console.log(evt.stepId, evt.output);
  }
}

Workflow 与 Agent 嵌套

import { createTool } from '@mastra/core/tools';

export const runResearchTool = createTool({
  id: 'run-research',
  description: '启动深度研究工作流',
  inputSchema: z.object({ topic: z.string() }),
  execute: async ({ context, mastra }) => {
    const wf = mastra.getWorkflow('researchFlow');
    const { result } = await wf.start({ inputData: { topic: context.topic } });
    return result;
  },
});

本章小结