Appearance
课 3 · BullMQ 渐进替换
本课目标
用 BullMQ 替换上节课的本地轮询调度层,获得延迟任务、优先级、并发控制和自动僵尸任务恢复。
先明确:任务处理逻辑(processor.ts)不变,只替换调度层。 这就是上节课把业务逻辑和调度分开的原因。
为什么要从轮询升级到 BullMQ
| 能力 | 本地轮询 | BullMQ |
|---|---|---|
| 延迟任务 | 手写 | 内置 |
| 任务优先级 | 手写 | 内置 |
| 并发控制 | 手写 | 内置 |
| 僵尸任务恢复 | 需要看门狗 | 内置心跳检测 |
| 可观测性 | 无 | Bull Board UI |
| 生产可靠性 | 弱 | 成熟方案 |
本地轮询版在开发阶段已经足够用来讲清任务模型,BullMQ 是生产就绪的替换方案。
安装
bash
# apps/worker
pnpm add bullmq
# apps/api(用来创建任务)
pnpm add bullmqBullMQ 依赖 Redis,正好复用模块 1 启动的 Redis 实例。
替换调度层
只需要改两处:
1. API 侧:用 BullMQ Queue 创建任务
typescript
// apps/api/src/lib/queue.ts
import { Queue } from 'bullmq'
import { config } from '../config'
export const ingestQueue = new Queue('ingest', {
connection: { url: config.redis.url },
})
// 创建任务
export async function enqueueIngestJob(jobData: {
jobId: string
fileName: string
filePath: string
}) {
await ingestQueue.add('ingest-doc', jobData, {
jobId: jobData.jobId, // 用我们自己的 ID,保持幂等
attempts: 3, // 最多重试 3 次
backoff: {
type: 'exponential',
delay: 2000, // 初始等待 2s,指数退避
},
})
}2. Worker 侧:用 BullMQ Worker 替换轮询
typescript
// apps/worker/src/index.ts
import { Worker } from 'bullmq'
import { processJob } from './processor'
import { config } from './config'
const worker = new Worker(
'ingest',
async (job) => {
// job.data 就是 API 侧 enqueue 时传入的 jobData
await processJob({
id: job.data.jobId,
fileName: job.data.fileName,
filePath: job.data.filePath,
retries: job.attemptsMade,
status: 'running',
// ...
})
},
{
connection: { url: config.redis.url },
concurrency: 2, // 最多同时处理 2 个任务
},
)
worker.on('completed', (job) => {
console.log(`[Worker] 任务 ${job.id} 完成`)
})
worker.on('failed', (job, err) => {
console.error(`[Worker] 任务 ${job?.id} 失败`, err.message)
})注意:processor.ts 的代码完全不变,这就是解耦的价值。
任务监控 UI(Bull Board)
bash
pnpm add @bull-board/hono @bull-board/apitypescript
// apps/api/src/index.ts(新增)
import { createBullBoard } from '@bull-board/api'
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'
import { HonoAdapter } from '@bull-board/hono'
import { ingestQueue } from './lib/queue'
const serverAdapter = new HonoAdapter(serveStatic)
createBullBoard({
queues: [new BullMQAdapter(ingestQueue)],
serverAdapter,
})
// 只在开发模式挂载监控 UI
if (process.env.NODE_ENV !== 'production') {
app.route('/ui/queues', serverAdapter.registerPlugin())
}访问 http://localhost:3000/ui/queues 可以看到任务队列状态、失败任务详情、重试按钮。
迁移策略
从本地轮询迁到 BullMQ 的步骤:
- 启动 Redis(模块 1 已配置,
pnpm dev:infra) - 安装
bullmq - 替换
apps/api/src/routes/docs.ts中的db.insertJob()为enqueueIngestJob() - 替换
apps/worker/src/index.ts的轮询循环为 BullMQ Worker - 旧的 SQLite 任务表可以保留,用来记录已完成任务的历史(BullMQ 默认只保留最近 N 条)
本节产物
apps/api/src/lib/
queue.ts # BullMQ Queue + enqueue 封装
apps/worker/src/
index.ts # BullMQ Worker(替换轮询版)面试追问
BullMQ 和 Kafka 什么时候选哪个?
BullMQ(基于 Redis)适合:任务量中等、需要延迟/优先级/重试、团队熟悉 Redis、不想引入新的基础设施。Kafka 适合:海量消息流、多消费者组、消息持久化几个月、需要精确的消息回放。进阶项目规模用 BullMQ 足够,面试时说清楚你的选型理由就行。
BullMQ 的 concurrency: 2 是什么意思?
一个 Worker 进程同时处理 2 个任务(并发 2)。适用于 Embedding 这种 I/O 密集型任务:一个任务在等 API 返回时,另一个可以并行跑。CPU 密集型任务(如 PDF 解析)不适合高并发,容易打满 CPU。