Skip to content

课 1 · 任务建模与状态机

本课目标

理解为什么文档导入不能放在 HTTP 请求里,用状态机建模 ingest job,设计合理的任务数据模型。

先记住一句话:用户上传文档后,请求应该立刻返回任务 ID;真正的工作由后台 Worker 完成。

为什么文档导入不能同步处理

一个文档导入请求要做多少事?

同步处理有三个问题:

  1. 超时:HTTP 请求超时通常 30-60 秒,大文档处理超时就失败
  2. 阻塞:处理期间 API 进程被占用,其他请求排队
  3. 无法重试:一旦失败,整个流程要从头来,用户感知不到进度

异步任务模型

正确做法是请求和处理分离:

任务状态设计

一个 ingest job 的完整生命周期:

pending → running → completed
                 ↘ failed → (retry → running)
状态含义
pending已创建,等待 Worker 取走
runningWorker 正在处理
completed处理成功,文档已可查询
failed处理失败,记录了错误原因

任务数据模型

typescript
// packages/shared/src/types/job.ts
export type JobStatus = 'pending' | 'running' | 'completed' | 'failed'

export interface IngestJob {
  id: string
  status: JobStatus
  fileName: string
  fileSize: number
  totalChunks?: number          // 切块总数(完成后才有)
  processedChunks?: number      // 已处理切块数(进度)
  error?: string                // 失败原因
  retries: number               // 已重试次数
  createdAt: Date
  updatedAt: Date
}

SQLite 存储(用基础课的 sqlite-vec,无需新增依赖):

sql
CREATE TABLE ingest_jobs (
  id TEXT PRIMARY KEY,
  status TEXT NOT NULL DEFAULT 'pending',
  file_name TEXT NOT NULL,
  file_size INTEGER NOT NULL,
  total_chunks INTEGER,
  processed_chunks INTEGER DEFAULT 0,
  error TEXT,
  retries INTEGER NOT NULL DEFAULT 0,
  created_at TEXT NOT NULL,
  updated_at TEXT NOT NULL
);

CREATE INDEX idx_ingest_jobs_status ON ingest_jobs(status);

SQLite 的多进程限制

上面的 SQLite 方案在单进程下完全可行。但进阶项目里 API 进程和 Worker 进程同时运行,两个进程并发写同一个 SQLite 文件会触发写锁竞争,轻则性能退化,重则 SQLITE_BUSY 报错导致任务丢失。

解决方法是把存储换成 Postgres(已在模块 1 的 Docker Compose 中启动):

用 Drizzle 定义 Schema(schema-first,TypeScript 类型自动推导):

bash
pnpm add drizzle-orm postgres   # postgres = postgres-js 驱动(更轻量现代)
pnpm add -D drizzle-kit

Drizzle 的两个层次

  • schema 层drizzle-orm/pg-core):定义列类型,pgTablepgEnumtext 等都从这里导入,跟驱动无关
  • 驱动层drizzle-orm/postgres-jsdrizzle-orm/node-postgres):在 client.ts 里选一次。进阶课用 postgres-jspostgres npm 包),轻量、支持 tagged template,不需要额外安装 pg
typescript
// packages/shared/src/db/schema.ts
import { pgTable, pgEnum, text, integer, timestamp } from 'drizzle-orm/pg-core'

export const jobStatusEnum = pgEnum('job_status', ['pending', 'running', 'completed', 'failed'])

export const ingestJobs = pgTable('ingest_jobs', {
  id:              text('id').primaryKey(),
  status:          jobStatusEnum('status').notNull().default('pending'),
  fileName:        text('file_name').notNull(),
  fileSize:        integer('file_size').notNull(),
  totalChunks:     integer('total_chunks'),
  processedChunks: integer('processed_chunks').notNull().default(0),
  error:           text('error'),
  retries:         integer('retries').notNull().default(0),
  createdAt:       timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
  updatedAt:       timestamp('updated_at', { withTimezone: true }).notNull().defaultNow(),
})

Schema 即文档,建表语句由 drizzle-kit 根据 schema 自动生成,不需要手写 SQL DDL。

Drizzle 客户端:

typescript
// apps/api/src/db/client.ts
import postgres from 'postgres'
import { drizzle } from 'drizzle-orm/postgres-js'
import * as schema from '@knowledgeops/shared/db/schema'
import { config } from '../config'

const client = postgres(config.database.url)
export const db = drizzle(client, { schema })

迁移管理(drizzle-kit):

typescript
// drizzle.config.ts(项目根目录)
import { defineConfig } from 'drizzle-kit'

export default defineConfig({
  schema: './packages/shared/src/db/schema.ts',
  out: './drizzle/migrations',
  dialect: 'postgresql',
  dbCredentials: { url: process.env.DATABASE_URL! },
})
bash
pnpm drizzle-kit generate   # 根据 schema 生成 SQL migration 文件
pnpm drizzle-kit migrate    # 应用迁移到数据库

API 端的任务创建

typescript
// apps/api/src/routes/docs.ts
import { randomUUID } from 'node:crypto'

docsRouter.post('/upload', async (c) => {
  const body = await c.req.parseBody()
  const file = body['file'] as File

  // 保存文件
  const filePath = await saveFile(file)

  // 创建任务,立即返回
  const job: IngestJob = {
    id: randomUUID(),
    status: 'pending',
    fileName: file.name,
    fileSize: file.size,
    retries: 0,
    createdAt: new Date(),
    updatedAt: new Date(),
  }

  await db.insertJob(job)

  // 立即返回 jobId,不等待处理
  return c.json({ jobId: job.id, status: 'pending' }, 202)
})

// 查询任务状态
docsRouter.get('/jobs/:id', async (c) => {
  const job = await db.getJob(c.req.param('id'))
  if (!job) return c.json({ error: '任务不存在' }, 404)
  return c.json(job)
})

db.insertJob / db.getJob 的 Drizzle 实现:

typescript
// apps/api/src/db/jobs.ts
import { eq } from 'drizzle-orm'
import { db } from './client'
import { ingestJobs } from '@knowledgeops/shared/db/schema'
import type { IngestJob } from '@knowledgeops/shared'

export async function insertJob(job: IngestJob) {
  await db.insert(ingestJobs).values({
    id:        job.id,
    status:    job.status,
    fileName:  job.fileName,
    fileSize:  job.fileSize,
    retries:   job.retries,
    createdAt: job.createdAt,
    updatedAt: job.updatedAt,
  })
}

export async function getJob(id: string) {
  const rows = await db
    .select()
    .from(ingestJobs)
    .where(eq(ingestJobs.id, id))
  return rows[0] ?? null
}

返回类型由 Drizzle 根据 schema 自动推断,不需要手写 IngestJob 泛型。

本节产物

packages/shared/src/
  db/schema.ts            # Drizzle schema(ingestJobs 表定义)
  types/job.ts            # IngestJob 类型定义
drizzle/
  migrations/             # drizzle-kit 生成的 SQL migration 文件
drizzle.config.ts         # drizzle-kit 配置
apps/api/src/
  routes/docs.ts          # 文档上传接口(返回 jobId)
  db/client.ts            # Drizzle 客户端
  db/jobs.ts              # 任务 CRUD

面试追问

任务队列和消息队列有什么区别?

消息队列(如 Kafka、RabbitMQ)侧重消息传递和发布订阅;任务队列(如 BullMQ)侧重任务调度、优先级、重试和延迟执行。任务队列通常基于消息队列实现,但提供了更多面向任务的抽象。进阶项目用 BullMQ(基于 Redis),开发简单但功能完整。

为什么 API 返回 202 而不是 200?

HTTP 202 Accepted 的语义是"请求已接受,但尚未处理完成"。这比 200 更准确——文档还没真的导入成功,只是接受了导入请求。

面向前端工程师和独立开发者的 AI 应用工程课程