Chapter 08

后台任务与队列

BullMQ 任务队列、定时任务、Worker 线程——构建健壮的异步处理系统

为什么需要任务队列

用户注册后发送欢迎邮件,如果邮件服务商 API 超时 5 秒,用户要等 5 秒才能看到"注册成功"——这是不可接受的。解决方案:将"发送邮件"任务放入队列,立即响应用户,由后台 Worker 异步处理。

BullMQ 快速上手

bun add bullmq ioredis
// queues/email.ts — 邮件发送队列
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';

const connection = new IORedis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null, // BullMQ 要求设置为 null
});

// 定义任务数据类型
type EmailJobData = {
  to: string;
  subject: string;
  template: string;
  variables: Record<string, string>;
};

// 创建队列
export const emailQueue = new Queue<EmailJobData>('email', {
  connection,
  defaultJobOptions: {
    attempts: 3,               // 最多重试 3 次
    backoff: {
      type: 'exponential',   // 指数退避:1s → 2s → 4s
      delay: 1000,
    },
    removeOnComplete: { count: 1000 }, // 保留最近 1000 条成功记录
    removeOnFail: { count: 5000 },     // 保留最近 5000 条失败记录
  },
});

// 添加任务(生产者)
export async function sendWelcomeEmail(userId: string, email: string) {
  await emailQueue.add(
    'welcome',          // 任务名称
    { to: email, subject: '欢迎加入!', template: 'welcome', variables: { userId } },
    { delay: 5000 }     // 延迟 5 秒执行
  );
}

export async function sendPasswordResetEmail(email: string, token: string) {
  await emailQueue.add(
    'password-reset',
    { to: email, subject: '密码重置', template: 'reset', variables: { token } },
    { priority: 1 }     // 高优先级(数字越小优先级越高)
  );
}

Worker(消费者)

// workers/email.worker.ts
import { Worker } from 'bullmq';
import { sendEmail } from '../lib/mailer';

const emailWorker = new Worker(
  'email',
  async (job) => {
    console.log(`处理任务 [${job.name}] id:${job.id}`);

    const { to, subject, template, variables } = job.data;

    // 更新进度(可实时查询)
    await job.updateProgress(10);

    const html = await renderTemplate(template, variables);
    await job.updateProgress(50);

    await sendEmail({ to, subject, html });
    await job.updateProgress(100);

    return { sent: true, timestamp: Date.now() }; // 返回值存入 job.returnvalue
  },
  {
    connection,
    concurrency: 5,      // 同时处理 5 个任务
    limiter: {
      max: 100,          // 速率限制:每分钟最多 100 个任务
      duration: 60000,
    },
  }
);

// 事件监听
emailWorker.on('completed', (job) => {
  console.log(`✓ 任务 ${job.id} 完成`);
});

emailWorker.on('failed', (job, err) => {
  console.error(`✗ 任务 ${job?.id} 失败 (${job?.attemptsMade} 次):`, err.message);
});

任务依赖(Flow)

BullMQ Flow 允许定义有依赖关系的任务树——父任务等待所有子任务完成后才执行:

import { FlowProducer } from 'bullmq';

const flowProducer = new FlowProducer({ connection });

// 用户注册流程:先发短信+邮件,全部成功后发统计
await flowProducer.add({
  name: 'register-analytics',
  queueName: 'analytics',
  data: { userId: 'user-123' },
  children: [
    { name: 'welcome-email', queueName: 'email',
      data: { to: 'user@example.com' } },
    { name: 'welcome-sms', queueName: 'sms',
      data: { phone: '+86138xxx' } },
  ],
});

定时任务(node-cron)

bun add node-cron
bun add -d @types/node-cron
import cron from 'node-cron';

// Cron 表达式:秒 分 时 日 月 周
// 每天凌晨2点清理过期 token
cron.schedule('0 2 * * *', async () => {
  console.log('开始清理过期数据...');
  const deleted = await prisma.refreshToken.deleteMany({
    where: { expiresAt: { lt: new Date() } },
  });
  console.log(`清理完成,删除 ${deleted.count} 条过期 token`);
});

// 每小时生成统计报告
cron.schedule('0 * * * *', async () => {
  await generateHourlyStats();
});

// 每5分钟检查系统健康
cron.schedule('*/5 * * * *', async () => {
  await healthCheck();
}, {
  scheduled: true,
  timezone: 'Asia/Shanghai'  // 指定时区!
});

多节点定时任务问题:如果有多个服务器实例,node-cron 会在每个实例上都运行——同一任务执行多次!解决方案:① 只在一个实例运行 cron(不推荐);② 使用分布式锁(Redis SETNX);③ 使用 BullMQ 的 Repeat 功能(推荐)。

BullMQ Repeat — 分布式定时任务

// 用 BullMQ Repeat 替代 node-cron(自动保证只执行一次)
await emailQueue.add(
  'weekly-newsletter',
  { template: 'newsletter' },
  {
    repeat: {
      pattern: '0 9 * * 1',  // 每周一上午9点
      tz: 'Asia/Shanghai',
    },
  }
);

Worker 线程:处理 CPU 密集任务

Node.js/Bun 的事件循环是单线程的。执行耗时计算(图像处理、加密、数据分析)会阻塞所有请求。解决方案:Worker Threads。

// workers/image-processor.ts(Worker 线程脚本)
import { parentPort, workerData } from 'node:worker_threads';
import sharp from 'sharp';

const { inputPath, outputPath, width, height } = workerData;

try {
  await sharp(inputPath)
    .resize(width, height, { fit: 'cover' })
    .webp({ quality: 85 })
    .toFile(outputPath);

  parentPort?.postMessage({ success: true, outputPath });
} catch (err) {
  parentPort?.postMessage({ success: false, error: err.message });
}
// 主线程:创建 Worker 线程
import { Worker } from 'node:worker_threads';

function processImage(input: string, output: string, w: number, h: number) {
  return new Promise((resolve, reject) => {
    const worker = new Worker('./workers/image-processor.ts', {
      workerData: { inputPath: input, outputPath: output, width: w, height: h },
    });

    worker.on('message', (result) => {
      if (result.success) resolve(result.outputPath);
      else reject(new Error(result.error));
      worker.terminate();
    });

    worker.on('error', reject);
  });
}

// 使用:不会阻塞主线程
app.post('/upload', async (c) => {
  const file = await c.req.blob();
  // 保存原图后,在 Worker 线程处理缩略图,主线程立即响应
  processImage(originalPath, thumbPath, 200, 200).catch(console.error);
  return c.json({ id: fileId, status: 'processing' }, 202);
});

队列监控:Bull Dashboard

bun add @bull-board/api @bull-board/hono
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { HonoAdapter } from '@bull-board/hono';

const serverAdapter = new HonoAdapter();
createBullBoard({
  queues: [new BullMQAdapter(emailQueue)],
  serverAdapter,
});
serverAdapter.setBasePath('/admin/queues');

// 挂载到 Hono(建议添加管理员权限中间件)
app.route('/admin/queues', serverAdapter.registerPlugin());

Bull Board 提供了可视化界面,可以实时查看队列状态、任务进度、失败原因,还能手动重试失败任务。生产环境必备监控工具。