Chapter 07

Queues、Cron 与 Workflows

不是所有事都能在请求内完成:发邮件、生成缩略图、每天对账、跨系统编排。本章把 Cloudflare 的三套异步机制讲清——Queues 解耦生产消费,Cron 触发定时,Workflows 管理长流程。

Queues:消息队列

创建与绑定

wrangler queues create send-email
# wrangler.toml 生产者
[[queues.producers]]
queue = "send-email"
binding = "EMAIL_Q"

# 消费者(可以是另一个 Worker)
[[queues.consumers]]
queue = "send-email"
max_batch_size = 10
max_batch_timeout = 5   # 最多攒 5s 或 10 条触发
max_retries = 3
dead_letter_queue = "send-email-dlq"

生产消息

app.post('/signup', async (c) => {
  const { email } = await c.req.json();
  // 立即返回给用户,邮件走队列异步发
  await c.env.EMAIL_Q.send({ type: 'welcome', to: email });
  return c.json({ ok: true });
});

// 批量发
await c.env.EMAIL_Q.sendBatch([
  { body: { type: 'news', to: 'a@x' } },
  { body: { type: 'news', to: 'b@x' } },
]);

消费消息

export default {
  async queue(batch: MessageBatch, env: Env) {
    for (const msg of batch.messages) {
      try {
        await sendEmail(msg.body);
        msg.ack();  // 成功
      } catch (e) {
        msg.retry({ delaySeconds: 60 });  // 1 分钟后重试
      }
    }
  },
};
至少一次投递
Queues 保证不丢消息,但可能重复。消费者要自己做幂等(比如按 msg.id 去重)。
批处理
消费者一次收一批,你可以并发处理或一次性调外部 API(比如批量写数据库)。
DLQ
重试达到上限的消息进 dead_letter_queue——人工处理或另一个 Worker 扫描告警。

Cron Triggers:定时任务

[triggers]
crons = [
  "*/5 * * * *",         # 每 5 分钟
  "0 2 * * *",            # 每天凌晨 2 点
  "0 0 * * 1",            # 每周一零点
]
export default {
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
    if (event.cron === '0 2 * * *') {
      ctx.waitUntil(runDailyReport(env));
    }
  },
};
Cron + Queue 组合拳
Cron Worker 只做调度——扫描要处理的用户,把任务塞进 Queue;真正的工作交给 Queue 消费者(天生并发)。这是高并发定时任务的正确拆法。

Workflows:持久化长流程

2025 GA 的 Cloudflare Workflows——像 AWS Step Functions 但原生 JS。把一个跨分钟/小时的流程写成代码,每一步自动持久化,失败自动重试,断点续跑。

定义

import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';

export class OrderFulfillment extends WorkflowEntrypoint<Env, { orderId: string }> {
  async run(event: WorkflowEvent<{ orderId: string }>, step: WorkflowStep) {
    const order = await step.do('fetch order', async () => {
      return await this.env.DB.prepare('SELECT * FROM orders WHERE id=?')
        .bind(event.payload.orderId).first();
    });

    await step.do('charge card', { retries: { limit: 3, delay: '30 seconds' } },
      async () => {
        await stripeCharge(order);
      },
    );

    await step.sleep('wait for warehouse', '30 minutes');

    await step.do('ship', async () => {
      await callShippingAPI(order);
    });

    await step.do('notify user', async () => {
      await this.env.EMAIL_Q.send({ type: 'shipped', to: order.email });
    });
  }
}

触发

app.post('/checkout', async (c) => {
  const order = await createOrder(c);
  const instance = await c.env.ORDER_WF.create({ params: { orderId: order.id } });
  return c.json({ workflowId: instance.id });
});

关键语义

step.do 持久化
每个 step 的输出落盘。即使整个流程崩溃重启,已完成的 step 不会重复运行。
step.sleep
可以睡几分钟、几小时、几天——期间不占 Worker 资源。到时自动唤醒续跑。
retries 配置
每个 step 自己配重试次数与间隔,失败只重跑那一步。
events / wait for
流程可以等一个外部事件(step.waitForEvent)——人工审批、webhook 回调都方便。

三者选型

需求选什么
一次请求内触发,分钟内完成Queue(异步)
周期性跑一段代码Cron Triggers
多步、跨小时/天、要断点续Workflows
单步但要 fanout 并发Cron 调度 + Queue 消费
实时协同 / 会话状态Durable Object(不是本章,但同类)

实战组合:用户注册后的全套异步

用户 POST /signup │ ▼ Worker 响应 200 + 触发: │ ├─→ EMAIL_Q.send(欢迎邮件) ← Queue 异步发 ├─→ ONBOARD_WF.create(new user) ← Workflow 跑 7 天引导 │ │ │ ├─ 立刻:送新人礼包 │ ├─ 1 天后:发 tips 邮件 │ ├─ 3 天后:检查激活,未激活送优惠券 │ └─ 7 天后:收尾报告入 analytics └─→ DB.insert(user) Cron 每天 02:00: 扫描"7 天未登录用户" → 塞入 EMAIL_Q 批量发唤醒邮件

本章小结