Skip to content

课 3 · Checkpoint 与中断恢复

本课目标

在最小 StateGraph 基础上加入 Checkpoint 机制,支持任务持久化、暂停等待人工审批、崩溃后恢复。

关键理解:Checkpoint 不是为了"保险",而是为了支持"长时任务"和"人在回路"两种生产场景。

两个核心场景

场景 1:长时任务崩溃恢复

文档导入 Worker 中途崩溃,重启后从断点继续,不从头开始。

场景 2:人在回路(Human-in-the-Loop)

Agent 执行到某个关键节点(比如"即将发送邮件"),暂停等待人工确认,确认后继续执行。

Checkpoint 接口设计

typescript
// packages/agent-runtime/src/checkpoint.ts
export interface Checkpoint<S> {
  taskId: string
  state: S
  currentNode: string
  timestamp: Date
}

export interface CheckpointStore<S> {
  save(taskId: string, checkpoint: Checkpoint<S>): Promise<void>
  load(taskId: string): Promise<Checkpoint<S> | null>
  delete(taskId: string): Promise<void>
}

SQLite 实现

typescript
// packages/agent-runtime/src/checkpoint-store.ts
import Database from 'better-sqlite3'

export class SqliteCheckpointStore<S> implements CheckpointStore<S> {
  private db: Database.Database

  constructor(dbPath = 'checkpoints.db') {
    this.db = new Database(dbPath)
    this.db.exec(`
      CREATE TABLE IF NOT EXISTS checkpoints (
        task_id TEXT PRIMARY KEY,
        state TEXT NOT NULL,
        current_node TEXT NOT NULL,
        timestamp TEXT NOT NULL
      )
    `)
  }

  async save(taskId: string, checkpoint: Checkpoint<S>) {
    this.db
      .prepare(`
        INSERT OR REPLACE INTO checkpoints (task_id, state, current_node, timestamp)
        VALUES (?, ?, ?, ?)
      `)
      .run(taskId, JSON.stringify(checkpoint.state), checkpoint.currentNode, checkpoint.timestamp.toISOString())
  }

  async load(taskId: string): Promise<Checkpoint<S> | null> {
    const row = this.db
      .prepare('SELECT * FROM checkpoints WHERE task_id = ?')
      .get(taskId) as any

    if (!row) return null

    return {
      taskId,
      state: JSON.parse(row.state) as S,
      currentNode: row.current_node,
      timestamp: new Date(row.timestamp),
    }
  }

  async delete(taskId: string) {
    this.db.prepare('DELETE FROM checkpoints WHERE task_id = ?').run(taskId)
  }
}

带 Checkpoint 的 StateGraph

在原有 StateGraph 基础上,每次 Node 执行后保存 Checkpoint:

typescript
// packages/agent-runtime/src/state-graph.ts(更新版)

export class StateGraph<S extends State> {
  private checkpointStore?: CheckpointStore<S>

  withCheckpoint(store: CheckpointStore<S>): this {
    this.checkpointStore = store
    return this
  }

  async invoke(initialState: S, startNode: string, taskId?: string): Promise<S> {
    let state = { ...initialState }
    let currentNode: string | '__end__' = startNode

    // 如果有 taskId,尝试从 Checkpoint 恢复
    if (taskId && this.checkpointStore) {
      const checkpoint = await this.checkpointStore.load(taskId)
      if (checkpoint) {
        console.log(`[Graph] 从 Checkpoint 恢复任务 ${taskId},继续 Node: ${checkpoint.currentNode}`)
        state = checkpoint.state
        currentNode = checkpoint.currentNode
      }
    }

    while (currentNode !== '__end__') {
      const nodeFn = this.nodes.get(currentNode)
      if (!nodeFn) throw new Error(`节点 "${currentNode}" 不存在`)

      const update = await nodeFn(state)
      state = this.reducer(state, update)

      // 执行后保存 Checkpoint
      if (taskId && this.checkpointStore) {
        const edge = this.edges.get(currentNode)
        const nextNode = typeof edge === 'string' ? edge : (edge?.(state) ?? '__end__')
        await this.checkpointStore.save(taskId, {
          taskId,
          state,
          currentNode: nextNode,  // 保存"下一步要执行的 Node"
          timestamp: new Date(),
        })
      }

      const edge = this.edges.get(currentNode)
      if (!edge) currentNode = '__end__'
      else if (typeof edge === 'string') currentNode = edge
      else currentNode = edge(state)
    }

    // 任务完成,清除 Checkpoint
    if (taskId && this.checkpointStore) {
      await this.checkpointStore.delete(taskId)
    }

    return state
  }
}

中断等待人工审批

在需要人工确认的 Node 后加一个"中断边":

typescript
// 定义一个特殊的中断状态
interface AgentState {
  // ...
  interrupted?: boolean
  awaitingApproval?: string  // 等待审批的操作描述
}

// 审批节点
graph.addNode('check-approval', async (state) => {
  if (!state.approvalGranted) {
    return {
      interrupted: true,
      awaitingApproval: `即将执行:${state.pendingAction},请确认`,
    }
  }
  return { interrupted: false }
})

// 条件边:如果需要审批就停在这里,否则继续
graph.addEdge('check-approval', (state) =>
  state.interrupted ? '__end__' : 'execute-action'
)

// 外部调用时:
// 1. 第一次执行,遇到 interrupted=true 停止,返回 awaitingApproval
// 2. 人工确认后,设置 approvalGranted=true,用同一个 taskId 再次调用
// 3. 从 Checkpoint 恢复,继续执行

Tool Call 轨迹记录

记录 Agent 执行过程中的每次工具调用:

typescript
interface TraceEntry {
  taskId: string
  nodeId: string
  type: 'node_start' | 'node_end' | 'tool_call' | 'tool_result'
  data: unknown
  timestamp: Date
}

// 在 StateGraph.invoke 里记录轨迹
async invoke(initialState: S, startNode: string, options?: InvokeOptions) {
  const trace: TraceEntry[] = []

  while (currentNode !== '__end__') {
    trace.push({ nodeId: currentNode, type: 'node_start', data: state, ... })
    const update = await nodeFn(state)
    trace.push({ nodeId: currentNode, type: 'node_end', data: update, ... })
    // ...
  }

  // 保存轨迹供调试
  if (options?.traceStore) {
    await options.traceStore.save(trace)
  }
}

本节产物

packages/agent-runtime/src/
  checkpoint.ts           # Checkpoint 接口定义
  checkpoint-store.ts     # SQLite 实现
  state-graph.ts          # 支持 Checkpoint 的 StateGraph(完整版)

面试追问

Checkpoint 和 Savepoint 有什么区别?

概念上相似,都是在某个时间点保存状态以便恢复。区别在于语境:数据库用 Savepoint(事务回滚点),游戏用存档,LangGraph 叫 Checkpoint(Agent 状态快照)。LangGraph 的 Checkpoint 不支持回滚(历史状态只读),只支持从某个断点继续执行。

中断恢复时如何保证幂等性?

关键是 Checkpoint 保存的是"下一步要执行的 Node",而不是"刚刚执行完的 Node"。这样崩溃后恢复时,不会重复执行已完成的 Node。对于有副作用的操作(如发邮件),还需要在 Node 内部做幂等处理(检查是否已执行过)。

面向前端工程师和独立开发者的 AI 应用工程课程