Queues:消息队列
创建与绑定
wrangler queues create send-email
# wrangler.toml 生产者 [[queues.producers]] queue = "send-email" binding = "EMAIL_Q" # 消费者(可以是另一个 Worker) [[queues.consumers]] queue = "send-email" max_batch_size = 10 max_batch_timeout = 5 # 最多攒 5s 或 10 条触发 max_retries = 3 dead_letter_queue = "send-email-dlq"
生产消息
app.post('/signup', async (c) => { const { email } = await c.req.json(); // 立即返回给用户,邮件走队列异步发 await c.env.EMAIL_Q.send({ type: 'welcome', to: email }); return c.json({ ok: true }); }); // 批量发 await c.env.EMAIL_Q.sendBatch([ { body: { type: 'news', to: 'a@x' } }, { body: { type: 'news', to: 'b@x' } }, ]);
消费消息
export default { async queue(batch: MessageBatch, env: Env) { for (const msg of batch.messages) { try { await sendEmail(msg.body); msg.ack(); // 成功 } catch (e) { msg.retry({ delaySeconds: 60 }); // 1 分钟后重试 } } }, };
至少一次投递
Queues 保证不丢消息,但可能重复。消费者要自己做幂等(比如按
msg.id 去重)。批处理
消费者一次收一批,你可以并发处理或一次性调外部 API(比如批量写数据库)。
DLQ
重试达到上限的消息进
dead_letter_queue——人工处理或另一个 Worker 扫描告警。Cron Triggers:定时任务
[triggers] crons = [ "*/5 * * * *", # 每 5 分钟 "0 2 * * *", # 每天凌晨 2 点 "0 0 * * 1", # 每周一零点 ]
export default { async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) { if (event.cron === '0 2 * * *') { ctx.waitUntil(runDailyReport(env)); } }, };
Cron + Queue 组合拳
Cron Worker 只做调度——扫描要处理的用户,把任务塞进 Queue;真正的工作交给 Queue 消费者(天生并发)。这是高并发定时任务的正确拆法。
Cron Worker 只做调度——扫描要处理的用户,把任务塞进 Queue;真正的工作交给 Queue 消费者(天生并发)。这是高并发定时任务的正确拆法。
Workflows:持久化长流程
2025 GA 的 Cloudflare Workflows——像 AWS Step Functions 但原生 JS。把一个跨分钟/小时的流程写成代码,每一步自动持久化,失败自动重试,断点续跑。
定义
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers'; export class OrderFulfillment extends WorkflowEntrypoint<Env, { orderId: string }> { async run(event: WorkflowEvent<{ orderId: string }>, step: WorkflowStep) { const order = await step.do('fetch order', async () => { return await this.env.DB.prepare('SELECT * FROM orders WHERE id=?') .bind(event.payload.orderId).first(); }); await step.do('charge card', { retries: { limit: 3, delay: '30 seconds' } }, async () => { await stripeCharge(order); }, ); await step.sleep('wait for warehouse', '30 minutes'); await step.do('ship', async () => { await callShippingAPI(order); }); await step.do('notify user', async () => { await this.env.EMAIL_Q.send({ type: 'shipped', to: order.email }); }); } }
触发
app.post('/checkout', async (c) => { const order = await createOrder(c); const instance = await c.env.ORDER_WF.create({ params: { orderId: order.id } }); return c.json({ workflowId: instance.id }); });
关键语义
step.do 持久化
每个 step 的输出落盘。即使整个流程崩溃重启,已完成的 step 不会重复运行。
step.sleep
可以睡几分钟、几小时、几天——期间不占 Worker 资源。到时自动唤醒续跑。
retries 配置
每个 step 自己配重试次数与间隔,失败只重跑那一步。
events / wait for
流程可以等一个外部事件(
step.waitForEvent)——人工审批、webhook 回调都方便。三者选型
| 需求 | 选什么 |
|---|---|
| 一次请求内触发,分钟内完成 | Queue(异步) |
| 周期性跑一段代码 | Cron Triggers |
| 多步、跨小时/天、要断点续 | Workflows |
| 单步但要 fanout 并发 | Cron 调度 + Queue 消费 |
| 实时协同 / 会话状态 | Durable Object(不是本章,但同类) |
实战组合:用户注册后的全套异步
用户 POST /signup
│
▼
Worker 响应 200 + 触发:
│
├─→ EMAIL_Q.send(欢迎邮件) ← Queue 异步发
├─→ ONBOARD_WF.create(new user) ← Workflow 跑 7 天引导
│ │
│ ├─ 立刻:送新人礼包
│ ├─ 1 天后:发 tips 邮件
│ ├─ 3 天后:检查激活,未激活送优惠券
│ └─ 7 天后:收尾报告入 analytics
└─→ DB.insert(user)
Cron 每天 02:00:
扫描"7 天未登录用户" → 塞入 EMAIL_Q 批量发唤醒邮件
本章小结
- Queues 解耦生产消费,批处理 + 至少一次投递 + DLQ
- Cron Triggers 写 crontab,scheduled handler 接收
- Workflows 管多步骤长流程,step.do 自动持久化,step.sleep 不占资源
- 三套互补:Queue 做并发、Cron 做调度、Workflow 做编排
- 常见架构:Cron 触发扫描 → Queue 扇出 → Workflow 驱动具体用户流程