Appearance
课 1 · 任务建模与状态机
本课目标
理解为什么文档导入不能放在 HTTP 请求里,用状态机建模 ingest job,设计合理的任务数据模型。
先记住一句话:用户上传文档后,请求应该立刻返回任务 ID;真正的工作由后台 Worker 完成。
为什么文档导入不能同步处理
一个文档导入请求要做多少事?
同步处理的问题
用户上传文档
→
解析 PDF几秒
→
切块 + 清洗几秒
→
批量 Embedding几十秒
→
写入索引几秒
→
返回成功
总耗时可能超过 30 秒,HTTP 超时,用户盯着空白页等待。
同步处理有三个问题:
- 超时:HTTP 请求超时通常 30-60 秒,大文档处理超时就失败
- 阻塞:处理期间 API 进程被占用,其他请求排队
- 无法重试:一旦失败,整个流程要从头来,用户感知不到进度
异步任务模型
正确做法是请求和处理分离:
异步任务模型
用户上传
→
API 立即返回jobId: "abc-123"
→
任务入队pending
Worker 轮询
→
取出任务running
→
处理完成completed
API 只负责创建任务,Worker 负责执行,两者完全解耦。
任务状态设计
一个 ingest job 的完整生命周期:
pending → running → completed
↘ failed → (retry → running)| 状态 | 含义 |
|---|---|
pending | 已创建,等待 Worker 取走 |
running | Worker 正在处理 |
completed | 处理成功,文档已可查询 |
failed | 处理失败,记录了错误原因 |
任务数据模型
typescript
// packages/shared/src/types/job.ts
export type JobStatus = 'pending' | 'running' | 'completed' | 'failed'
export interface IngestJob {
id: string
status: JobStatus
fileName: string
fileSize: number
totalChunks?: number // 切块总数(完成后才有)
processedChunks?: number // 已处理切块数(进度)
error?: string // 失败原因
retries: number // 已重试次数
createdAt: Date
updatedAt: Date
}SQLite 存储(用基础课的 sqlite-vec,无需新增依赖):
sql
CREATE TABLE ingest_jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'pending',
file_name TEXT NOT NULL,
file_size INTEGER NOT NULL,
total_chunks INTEGER,
processed_chunks INTEGER DEFAULT 0,
error TEXT,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX idx_ingest_jobs_status ON ingest_jobs(status);SQLite 的多进程限制
上面的 SQLite 方案在单进程下完全可行。但进阶项目里 API 进程和 Worker 进程同时运行,两个进程并发写同一个 SQLite 文件会触发写锁竞争,轻则性能退化,重则 SQLITE_BUSY 报错导致任务丢失。
解决方法是把存储换成 Postgres(已在模块 1 的 Docker Compose 中启动):
用 Drizzle 定义 Schema(schema-first,TypeScript 类型自动推导):
bash
pnpm add drizzle-orm postgres # postgres = postgres-js 驱动(更轻量现代)
pnpm add -D drizzle-kitDrizzle 的两个层次
- schema 层(
drizzle-orm/pg-core):定义列类型,pgTable、pgEnum、text等都从这里导入,跟驱动无关 - 驱动层(
drizzle-orm/postgres-js或drizzle-orm/node-postgres):在client.ts里选一次。进阶课用postgres-js(postgresnpm 包),轻量、支持 tagged template,不需要额外安装pg
typescript
// packages/shared/src/db/schema.ts
import { pgTable, pgEnum, text, integer, timestamp } from 'drizzle-orm/pg-core'
export const jobStatusEnum = pgEnum('job_status', ['pending', 'running', 'completed', 'failed'])
export const ingestJobs = pgTable('ingest_jobs', {
id: text('id').primaryKey(),
status: jobStatusEnum('status').notNull().default('pending'),
fileName: text('file_name').notNull(),
fileSize: integer('file_size').notNull(),
totalChunks: integer('total_chunks'),
processedChunks: integer('processed_chunks').notNull().default(0),
error: text('error'),
retries: integer('retries').notNull().default(0),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp('updated_at', { withTimezone: true }).notNull().defaultNow(),
})Schema 即文档,建表语句由 drizzle-kit 根据 schema 自动生成,不需要手写 SQL DDL。
Drizzle 客户端:
typescript
// apps/api/src/db/client.ts
import postgres from 'postgres'
import { drizzle } from 'drizzle-orm/postgres-js'
import * as schema from '@knowledgeops/shared/db/schema'
import { config } from '../config'
const client = postgres(config.database.url)
export const db = drizzle(client, { schema })迁移管理(drizzle-kit):
typescript
// drizzle.config.ts(项目根目录)
import { defineConfig } from 'drizzle-kit'
export default defineConfig({
schema: './packages/shared/src/db/schema.ts',
out: './drizzle/migrations',
dialect: 'postgresql',
dbCredentials: { url: process.env.DATABASE_URL! },
})bash
pnpm drizzle-kit generate # 根据 schema 生成 SQL migration 文件
pnpm drizzle-kit migrate # 应用迁移到数据库API 端的任务创建
typescript
// apps/api/src/routes/docs.ts
import { randomUUID } from 'node:crypto'
docsRouter.post('/upload', async (c) => {
const body = await c.req.parseBody()
const file = body['file'] as File
// 保存文件
const filePath = await saveFile(file)
// 创建任务,立即返回
const job: IngestJob = {
id: randomUUID(),
status: 'pending',
fileName: file.name,
fileSize: file.size,
retries: 0,
createdAt: new Date(),
updatedAt: new Date(),
}
await db.insertJob(job)
// 立即返回 jobId,不等待处理
return c.json({ jobId: job.id, status: 'pending' }, 202)
})
// 查询任务状态
docsRouter.get('/jobs/:id', async (c) => {
const job = await db.getJob(c.req.param('id'))
if (!job) return c.json({ error: '任务不存在' }, 404)
return c.json(job)
})db.insertJob / db.getJob 的 Drizzle 实现:
typescript
// apps/api/src/db/jobs.ts
import { eq } from 'drizzle-orm'
import { db } from './client'
import { ingestJobs } from '@knowledgeops/shared/db/schema'
import type { IngestJob } from '@knowledgeops/shared'
export async function insertJob(job: IngestJob) {
await db.insert(ingestJobs).values({
id: job.id,
status: job.status,
fileName: job.fileName,
fileSize: job.fileSize,
retries: job.retries,
createdAt: job.createdAt,
updatedAt: job.updatedAt,
})
}
export async function getJob(id: string) {
const rows = await db
.select()
.from(ingestJobs)
.where(eq(ingestJobs.id, id))
return rows[0] ?? null
}返回类型由 Drizzle 根据 schema 自动推断,不需要手写 IngestJob 泛型。
本节产物
packages/shared/src/
db/schema.ts # Drizzle schema(ingestJobs 表定义)
types/job.ts # IngestJob 类型定义
drizzle/
migrations/ # drizzle-kit 生成的 SQL migration 文件
drizzle.config.ts # drizzle-kit 配置
apps/api/src/
routes/docs.ts # 文档上传接口(返回 jobId)
db/client.ts # Drizzle 客户端
db/jobs.ts # 任务 CRUD面试追问
任务队列和消息队列有什么区别?
消息队列(如 Kafka、RabbitMQ)侧重消息传递和发布订阅;任务队列(如 BullMQ)侧重任务调度、优先级、重试和延迟执行。任务队列通常基于消息队列实现,但提供了更多面向任务的抽象。进阶项目用 BullMQ(基于 Redis),开发简单但功能完整。
为什么 API 返回 202 而不是 200?
HTTP 202 Accepted 的语义是"请求已接受,但尚未处理完成"。这比 200 更准确——文档还没真的导入成功,只是接受了导入请求。