为什么需要任务队列
用户注册后发送欢迎邮件,如果邮件服务商 API 超时 5 秒,用户要等 5 秒才能看到"注册成功"——这是不可接受的。解决方案:将"发送邮件"任务放入队列,立即响应用户,由后台 Worker 异步处理。
- 任务队列 一种先进先出(FIFO)的数据结构,生产者(Producer)将任务推入队列,消费者(Worker)从队列取出任务执行。常用 Redis 作为持久化存储。
- BullMQ 基于 Redis 的高性能任务队列库,是 Bull 的 TypeScript 原生继任者。支持优先级、延迟任务、失败重试、任务依赖、流量限制等企业级功能。
- Worker 任务消费者,持续监听队列,取出任务执行。可以横向扩展(多个 Worker 并发处理同一队列)。
- Dead Letter Queue 死信队列,存放多次重试失败的任务。避免任务永远重试阻塞队列,同时保留失败记录便于人工处理。
BullMQ 快速上手
bun add bullmq ioredis
// queues/email.ts — 邮件发送队列
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';
const connection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, // BullMQ 要求设置为 null
});
// 定义任务数据类型
type EmailJobData = {
to: string;
subject: string;
template: string;
variables: Record<string, string>;
};
// 创建队列
export const emailQueue = new Queue<EmailJobData>('email', {
connection,
defaultJobOptions: {
attempts: 3, // 最多重试 3 次
backoff: {
type: 'exponential', // 指数退避:1s → 2s → 4s
delay: 1000,
},
removeOnComplete: { count: 1000 }, // 保留最近 1000 条成功记录
removeOnFail: { count: 5000 }, // 保留最近 5000 条失败记录
},
});
// 添加任务(生产者)
export async function sendWelcomeEmail(userId: string, email: string) {
await emailQueue.add(
'welcome', // 任务名称
{ to: email, subject: '欢迎加入!', template: 'welcome', variables: { userId } },
{ delay: 5000 } // 延迟 5 秒执行
);
}
export async function sendPasswordResetEmail(email: string, token: string) {
await emailQueue.add(
'password-reset',
{ to: email, subject: '密码重置', template: 'reset', variables: { token } },
{ priority: 1 } // 高优先级(数字越小优先级越高)
);
}
Worker(消费者)
// workers/email.worker.ts
import { Worker } from 'bullmq';
import { sendEmail } from '../lib/mailer';
const emailWorker = new Worker(
'email',
async (job) => {
console.log(`处理任务 [${job.name}] id:${job.id}`);
const { to, subject, template, variables } = job.data;
// 更新进度(可实时查询)
await job.updateProgress(10);
const html = await renderTemplate(template, variables);
await job.updateProgress(50);
await sendEmail({ to, subject, html });
await job.updateProgress(100);
return { sent: true, timestamp: Date.now() }; // 返回值存入 job.returnvalue
},
{
connection,
concurrency: 5, // 同时处理 5 个任务
limiter: {
max: 100, // 速率限制:每分钟最多 100 个任务
duration: 60000,
},
}
);
// 事件监听
emailWorker.on('completed', (job) => {
console.log(`✓ 任务 ${job.id} 完成`);
});
emailWorker.on('failed', (job, err) => {
console.error(`✗ 任务 ${job?.id} 失败 (${job?.attemptsMade} 次):`, err.message);
});
任务依赖(Flow)
BullMQ Flow 允许定义有依赖关系的任务树——父任务等待所有子任务完成后才执行:
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({ connection });
// 用户注册流程:先发短信+邮件,全部成功后发统计
await flowProducer.add({
name: 'register-analytics',
queueName: 'analytics',
data: { userId: 'user-123' },
children: [
{ name: 'welcome-email', queueName: 'email',
data: { to: 'user@example.com' } },
{ name: 'welcome-sms', queueName: 'sms',
data: { phone: '+86138xxx' } },
],
});
定时任务(node-cron)
bun add node-cron
bun add -d @types/node-cron
import cron from 'node-cron';
// Cron 表达式:秒 分 时 日 月 周
// 每天凌晨2点清理过期 token
cron.schedule('0 2 * * *', async () => {
console.log('开始清理过期数据...');
const deleted = await prisma.refreshToken.deleteMany({
where: { expiresAt: { lt: new Date() } },
});
console.log(`清理完成,删除 ${deleted.count} 条过期 token`);
});
// 每小时生成统计报告
cron.schedule('0 * * * *', async () => {
await generateHourlyStats();
});
// 每5分钟检查系统健康
cron.schedule('*/5 * * * *', async () => {
await healthCheck();
}, {
scheduled: true,
timezone: 'Asia/Shanghai' // 指定时区!
});
多节点定时任务问题:如果有多个服务器实例,node-cron 会在每个实例上都运行——同一任务执行多次!解决方案:① 只在一个实例运行 cron(不推荐);② 使用分布式锁(Redis SETNX);③ 使用 BullMQ 的 Repeat 功能(推荐)。
BullMQ Repeat — 分布式定时任务
// 用 BullMQ Repeat 替代 node-cron(自动保证只执行一次)
await emailQueue.add(
'weekly-newsletter',
{ template: 'newsletter' },
{
repeat: {
pattern: '0 9 * * 1', // 每周一上午9点
tz: 'Asia/Shanghai',
},
}
);
Worker 线程:处理 CPU 密集任务
Node.js/Bun 的事件循环是单线程的。执行耗时计算(图像处理、加密、数据分析)会阻塞所有请求。解决方案:Worker Threads。
// workers/image-processor.ts(Worker 线程脚本)
import { parentPort, workerData } from 'node:worker_threads';
import sharp from 'sharp';
const { inputPath, outputPath, width, height } = workerData;
try {
await sharp(inputPath)
.resize(width, height, { fit: 'cover' })
.webp({ quality: 85 })
.toFile(outputPath);
parentPort?.postMessage({ success: true, outputPath });
} catch (err) {
parentPort?.postMessage({ success: false, error: err.message });
}
// 主线程:创建 Worker 线程
import { Worker } from 'node:worker_threads';
function processImage(input: string, output: string, w: number, h: number) {
return new Promise((resolve, reject) => {
const worker = new Worker('./workers/image-processor.ts', {
workerData: { inputPath: input, outputPath: output, width: w, height: h },
});
worker.on('message', (result) => {
if (result.success) resolve(result.outputPath);
else reject(new Error(result.error));
worker.terminate();
});
worker.on('error', reject);
});
}
// 使用:不会阻塞主线程
app.post('/upload', async (c) => {
const file = await c.req.blob();
// 保存原图后,在 Worker 线程处理缩略图,主线程立即响应
processImage(originalPath, thumbPath, 200, 200).catch(console.error);
return c.json({ id: fileId, status: 'processing' }, 202);
});
队列监控:Bull Dashboard
bun add @bull-board/api @bull-board/hono
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { HonoAdapter } from '@bull-board/hono';
const serverAdapter = new HonoAdapter();
createBullBoard({
queues: [new BullMQAdapter(emailQueue)],
serverAdapter,
});
serverAdapter.setBasePath('/admin/queues');
// 挂载到 Hono(建议添加管理员权限中间件)
app.route('/admin/queues', serverAdapter.registerPlugin());
Bull Board 提供了可视化界面,可以实时查看队列状态、任务进度、失败原因,还能手动重试失败任务。生产环境必备监控工具。