Skip to content

课 3 · BullMQ 渐进替换

本课目标

用 BullMQ 替换上节课的本地轮询调度层,获得延迟任务、优先级、并发控制和自动僵尸任务恢复。

先明确:任务处理逻辑(processor.ts)不变,只替换调度层。 这就是上节课把业务逻辑和调度分开的原因。

为什么要从轮询升级到 BullMQ

能力本地轮询BullMQ
延迟任务手写内置
任务优先级手写内置
并发控制手写内置
僵尸任务恢复需要看门狗内置心跳检测
可观测性Bull Board UI
生产可靠性成熟方案

本地轮询版在开发阶段已经足够用来讲清任务模型,BullMQ 是生产就绪的替换方案。

安装

bash
# apps/worker
pnpm add bullmq

# apps/api(用来创建任务)
pnpm add bullmq

BullMQ 依赖 Redis,正好复用模块 1 启动的 Redis 实例。

替换调度层

只需要改两处:

1. API 侧:用 BullMQ Queue 创建任务

typescript
// apps/api/src/lib/queue.ts
import { Queue } from 'bullmq'
import { config } from '../config'

export const ingestQueue = new Queue('ingest', {
  connection: { url: config.redis.url },
})

// 创建任务
export async function enqueueIngestJob(jobData: {
  jobId: string
  fileName: string
  filePath: string
}) {
  await ingestQueue.add('ingest-doc', jobData, {
    jobId: jobData.jobId,       // 用我们自己的 ID,保持幂等
    attempts: 3,                // 最多重试 3 次
    backoff: {
      type: 'exponential',
      delay: 2000,              // 初始等待 2s,指数退避
    },
  })
}

2. Worker 侧:用 BullMQ Worker 替换轮询

typescript
// apps/worker/src/index.ts
import { Worker } from 'bullmq'
import { processJob } from './processor'
import { config } from './config'

const worker = new Worker(
  'ingest',
  async (job) => {
    // job.data 就是 API 侧 enqueue 时传入的 jobData
    await processJob({
      id: job.data.jobId,
      fileName: job.data.fileName,
      filePath: job.data.filePath,
      retries: job.attemptsMade,
      status: 'running',
      // ...
    })
  },
  {
    connection: { url: config.redis.url },
    concurrency: 2,              // 最多同时处理 2 个任务
  },
)

worker.on('completed', (job) => {
  console.log(`[Worker] 任务 ${job.id} 完成`)
})

worker.on('failed', (job, err) => {
  console.error(`[Worker] 任务 ${job?.id} 失败`, err.message)
})

注意:processor.ts 的代码完全不变,这就是解耦的价值。

任务监控 UI(Bull Board)

bash
pnpm add @bull-board/hono @bull-board/api
typescript
// apps/api/src/index.ts(新增)
import { createBullBoard } from '@bull-board/api'
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'
import { HonoAdapter } from '@bull-board/hono'
import { ingestQueue } from './lib/queue'

const serverAdapter = new HonoAdapter(serveStatic)
createBullBoard({
  queues: [new BullMQAdapter(ingestQueue)],
  serverAdapter,
})

// 只在开发模式挂载监控 UI
if (process.env.NODE_ENV !== 'production') {
  app.route('/ui/queues', serverAdapter.registerPlugin())
}

访问 http://localhost:3000/ui/queues 可以看到任务队列状态、失败任务详情、重试按钮。

迁移策略

从本地轮询迁到 BullMQ 的步骤:

  1. 启动 Redis(模块 1 已配置,pnpm dev:infra
  2. 安装 bullmq
  3. 替换 apps/api/src/routes/docs.ts 中的 db.insertJob()enqueueIngestJob()
  4. 替换 apps/worker/src/index.ts 的轮询循环为 BullMQ Worker
  5. 旧的 SQLite 任务表可以保留,用来记录已完成任务的历史(BullMQ 默认只保留最近 N 条)

本节产物

apps/api/src/lib/
  queue.ts            # BullMQ Queue + enqueue 封装
apps/worker/src/
  index.ts            # BullMQ Worker(替换轮询版)

面试追问

BullMQ 和 Kafka 什么时候选哪个?

BullMQ(基于 Redis)适合:任务量中等、需要延迟/优先级/重试、团队熟悉 Redis、不想引入新的基础设施。Kafka 适合:海量消息流、多消费者组、消息持久化几个月、需要精确的消息回放。进阶项目规模用 BullMQ 足够,面试时说清楚你的选型理由就行。

BullMQ 的 concurrency: 2 是什么意思?

一个 Worker 进程同时处理 2 个任务(并发 2)。适用于 Embedding 这种 I/O 密集型任务:一个任务在等 API 返回时,另一个可以并行跑。CPU 密集型任务(如 PDF 解析)不适合高并发,容易打满 CPU。

面向前端工程师和独立开发者的 AI 应用工程课程