Skip to content

课 2 · Worker 消费与失败重试

本课目标

实现本地轮询版 Worker,让它从数据库取出待处理任务、执行导入流程,并处理失败重试和幂等。课后你会拿到一个能在后台运行的 Worker 进程。

关键理解:Worker 就是一个无限循环:不断检查有没有新任务,有就取来处理,没有就等一会再检查。

Worker 的基本结构

最简单的 Worker 就是一个轮询循环:

typescript
// apps/worker/src/index.ts
import { processJob } from './processor'
import { db } from './db'

async function runWorker() {
  console.log('[Worker] 启动,等待任务...')

  while (true) {
    try {
      // 取一个 pending 任务
      const job = await db.claimNextJob()

      if (job) {
        console.log(`[Worker] 开始处理任务 ${job.id}`)
        await processJob(job)
      } else {
        // 没有任务,等 2 秒再查
        await sleep(2000)
      }
    } catch (err) {
      console.error('[Worker] 意外错误', err)
      await sleep(5000) // 出错后等久一点
    }
  }
}

runWorker()

任务领取(防并发重复处理)

多个 Worker 同时运行时,同一个任务可能被多次领取。用数据库事务加乐观锁保证原子性:

typescript
// apps/worker/src/db.ts

// 原子性领取:只有 status=pending 的任务才能被领取
async function claimNextJob(): Promise<IngestJob | null> {
  // SQLite 单进程下这已经足够;多进程场景改用 FOR UPDATE SKIP LOCKED
  const job = await db.get<IngestJob>(
    `SELECT * FROM ingest_jobs
     WHERE status = 'pending'
     ORDER BY created_at ASC
     LIMIT 1`
  )

  if (!job) return null

  const affected = await db.run(
    `UPDATE ingest_jobs
     SET status = 'running', updated_at = ?
     WHERE id = ? AND status = 'pending'`,   // 条件检查,避免重复领取
    [new Date().toISOString(), job.id]
  )

  // 如果其他 Worker 刚领走,affected 为 0,返回 null 继续找下一个
  if (affected.changes === 0) return null

  return { ...job, status: 'running' }
}

任务处理流程

typescript
// apps/worker/src/processor.ts
import type { IngestJob } from '@shared/types/job'

export async function processJob(job: IngestJob): Promise<void> {
  try {
    // 1. 读取文件
    const content = await readFile(job.fileName)

    // 2. 切块
    const chunks = splitIntoChunks(content)
    await db.updateJob(job.id, { totalChunks: chunks.length })

    // 3. 批量 Embedding + 写入向量库
    for (let i = 0; i < chunks.length; i++) {
      await embedAndStore(chunks[i], job.id)
      // 更新进度
      await db.updateJob(job.id, { processedChunks: i + 1 })
    }

    // 4. 写入 Elasticsearch(模块 4 再实现,这里先 placeholder)
    // await esIndexChunks(chunks, job.id)

    // 5. 标记完成
    await db.updateJob(job.id, { status: 'completed' })
    console.log(`[Worker] 任务 ${job.id} 完成,共 ${chunks.length} 个 chunk`)
  } catch (err) {
    await handleJobFailure(job, err)
  }
}

失败重试

typescript
// apps/worker/src/processor.ts
const MAX_RETRIES = 3

async function handleJobFailure(job: IngestJob, err: unknown) {
  const errorMessage = err instanceof Error ? err.message : String(err)
  console.error(`[Worker] 任务 ${job.id} 失败: ${errorMessage}`)

  if (job.retries < MAX_RETRIES) {
    // 还有重试次数:回到 pending,retries + 1
    await db.updateJob(job.id, {
      status: 'pending',
      retries: job.retries + 1,
      error: errorMessage,
    })
    console.log(`[Worker] 任务 ${job.id} 将重试(第 ${job.retries + 1} 次)`)
  } else {
    // 超出重试次数:标记为最终失败
    await db.updateJob(job.id, {
      status: 'failed',
      error: errorMessage,
    })
    console.error(`[Worker] 任务 ${job.id} 超出重试次数,最终失败`)
  }
}

幂等设计

同一个任务如果被处理两次,结果应该一样。确保幂等的关键:写入向量库时用 jobId + chunkIndex 作为唯一 ID,重复写入会覆盖而不是追加:

typescript
async function embedAndStore(chunk: string, jobId: string, index: number) {
  const embedding = await embed(chunk)
  const docId = `${jobId}-${index}` // 幂等 ID

  // upsert:存在则更新,不存在则插入
  await vectorStore.upsert({
    id: docId,
    embedding,
    content: chunk,
    metadata: { jobId, chunkIndex: index },
  })
}

任务状态 API

让前端能轮询进度:

typescript
// apps/api/src/routes/docs.ts
docsRouter.get('/jobs/:id', async (c) => {
  const job = await db.getJob(c.req.param('id'))
  if (!job) return c.json({ error: '任务不存在' }, 404)

  return c.json({
    id: job.id,
    status: job.status,
    progress: job.totalChunks
      ? Math.round((job.processedChunks! / job.totalChunks) * 100)
      : null,
    error: job.error,
  })
})

本节产物

apps/worker/
  src/
    index.ts          # Worker 主循环
    processor.ts      # 任务处理逻辑
    db.ts             # 任务领取与状态更新
  package.json

面试追问

Worker 挂了以后任务怎么办?

关键是"状态恢复":running 的任务如果 Worker 崩溃不会自动变回 pending。生产环境需要一个"任务看门狗":定时扫描 running 状态超过阈值时间(如 10 分钟)的任务,将其重置为 pending 以便重试。这节课的本地 Worker 单进程足够,模块 3 的 BullMQ 内置了心跳检测和僵尸任务恢复。

Embedding 失败如何重试?

把 Embedding 失败归为 processJob 的错误,走统一重试逻辑。注意重试要从当前的 processedChunks 断点续传,不要从头 Embedding 已完成的 chunk(浪费 API 调用费用)。

面向前端工程师和独立开发者的 AI 应用工程课程