Appearance
课 2 · Worker 消费与失败重试
本课目标
实现本地轮询版 Worker,让它从数据库取出待处理任务、执行导入流程,并处理失败重试和幂等。课后你会拿到一个能在后台运行的 Worker 进程。
关键理解:Worker 就是一个无限循环:不断检查有没有新任务,有就取来处理,没有就等一会再检查。
Worker 的基本结构
最简单的 Worker 就是一个轮询循环:
typescript
// apps/worker/src/index.ts
import { processJob } from './processor'
import { db } from './db'
async function runWorker() {
console.log('[Worker] 启动,等待任务...')
while (true) {
try {
// 取一个 pending 任务
const job = await db.claimNextJob()
if (job) {
console.log(`[Worker] 开始处理任务 ${job.id}`)
await processJob(job)
} else {
// 没有任务,等 2 秒再查
await sleep(2000)
}
} catch (err) {
console.error('[Worker] 意外错误', err)
await sleep(5000) // 出错后等久一点
}
}
}
runWorker()任务领取(防并发重复处理)
多个 Worker 同时运行时,同一个任务可能被多次领取。用数据库事务加乐观锁保证原子性:
typescript
// apps/worker/src/db.ts
// 原子性领取:只有 status=pending 的任务才能被领取
async function claimNextJob(): Promise<IngestJob | null> {
// SQLite 单进程下这已经足够;多进程场景改用 FOR UPDATE SKIP LOCKED
const job = await db.get<IngestJob>(
`SELECT * FROM ingest_jobs
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 1`
)
if (!job) return null
const affected = await db.run(
`UPDATE ingest_jobs
SET status = 'running', updated_at = ?
WHERE id = ? AND status = 'pending'`, // 条件检查,避免重复领取
[new Date().toISOString(), job.id]
)
// 如果其他 Worker 刚领走,affected 为 0,返回 null 继续找下一个
if (affected.changes === 0) return null
return { ...job, status: 'running' }
}任务处理流程
typescript
// apps/worker/src/processor.ts
import type { IngestJob } from '@shared/types/job'
export async function processJob(job: IngestJob): Promise<void> {
try {
// 1. 读取文件
const content = await readFile(job.fileName)
// 2. 切块
const chunks = splitIntoChunks(content)
await db.updateJob(job.id, { totalChunks: chunks.length })
// 3. 批量 Embedding + 写入向量库
for (let i = 0; i < chunks.length; i++) {
await embedAndStore(chunks[i], job.id)
// 更新进度
await db.updateJob(job.id, { processedChunks: i + 1 })
}
// 4. 写入 Elasticsearch(模块 4 再实现,这里先 placeholder)
// await esIndexChunks(chunks, job.id)
// 5. 标记完成
await db.updateJob(job.id, { status: 'completed' })
console.log(`[Worker] 任务 ${job.id} 完成,共 ${chunks.length} 个 chunk`)
} catch (err) {
await handleJobFailure(job, err)
}
}失败重试
typescript
// apps/worker/src/processor.ts
const MAX_RETRIES = 3
async function handleJobFailure(job: IngestJob, err: unknown) {
const errorMessage = err instanceof Error ? err.message : String(err)
console.error(`[Worker] 任务 ${job.id} 失败: ${errorMessage}`)
if (job.retries < MAX_RETRIES) {
// 还有重试次数:回到 pending,retries + 1
await db.updateJob(job.id, {
status: 'pending',
retries: job.retries + 1,
error: errorMessage,
})
console.log(`[Worker] 任务 ${job.id} 将重试(第 ${job.retries + 1} 次)`)
} else {
// 超出重试次数:标记为最终失败
await db.updateJob(job.id, {
status: 'failed',
error: errorMessage,
})
console.error(`[Worker] 任务 ${job.id} 超出重试次数,最终失败`)
}
}幂等设计
同一个任务如果被处理两次,结果应该一样。确保幂等的关键:写入向量库时用 jobId + chunkIndex 作为唯一 ID,重复写入会覆盖而不是追加:
typescript
async function embedAndStore(chunk: string, jobId: string, index: number) {
const embedding = await embed(chunk)
const docId = `${jobId}-${index}` // 幂等 ID
// upsert:存在则更新,不存在则插入
await vectorStore.upsert({
id: docId,
embedding,
content: chunk,
metadata: { jobId, chunkIndex: index },
})
}任务状态 API
让前端能轮询进度:
typescript
// apps/api/src/routes/docs.ts
docsRouter.get('/jobs/:id', async (c) => {
const job = await db.getJob(c.req.param('id'))
if (!job) return c.json({ error: '任务不存在' }, 404)
return c.json({
id: job.id,
status: job.status,
progress: job.totalChunks
? Math.round((job.processedChunks! / job.totalChunks) * 100)
: null,
error: job.error,
})
})本节产物
apps/worker/
src/
index.ts # Worker 主循环
processor.ts # 任务处理逻辑
db.ts # 任务领取与状态更新
package.json面试追问
Worker 挂了以后任务怎么办?
关键是"状态恢复":running 的任务如果 Worker 崩溃不会自动变回 pending。生产环境需要一个"任务看门狗":定时扫描 running 状态超过阈值时间(如 10 分钟)的任务,将其重置为 pending 以便重试。这节课的本地 Worker 单进程足够,模块 3 的 BullMQ 内置了心跳检测和僵尸任务恢复。
Embedding 失败如何重试?
把 Embedding 失败归为 processJob 的错误,走统一重试逻辑。注意重试要从当前的 processedChunks 断点续传,不要从头 Embedding 已完成的 chunk(浪费 API 调用费用)。