Appearance
课 3 · Checkpoint 与中断恢复
本课目标
在最小 StateGraph 基础上加入 Checkpoint 机制,支持任务持久化、暂停等待人工审批、崩溃后恢复。
关键理解:Checkpoint 不是为了"保险",而是为了支持"长时任务"和"人在回路"两种生产场景。
两个核心场景
场景 1:长时任务崩溃恢复
文档导入 Worker 中途崩溃,重启后从断点继续,不从头开始。
场景 2:人在回路(Human-in-the-Loop)
Agent 执行到某个关键节点(比如"即将发送邮件"),暂停等待人工确认,确认后继续执行。
Checkpoint 接口设计
typescript
// packages/agent-runtime/src/checkpoint.ts
export interface Checkpoint<S> {
taskId: string
state: S
currentNode: string
timestamp: Date
}
export interface CheckpointStore<S> {
save(taskId: string, checkpoint: Checkpoint<S>): Promise<void>
load(taskId: string): Promise<Checkpoint<S> | null>
delete(taskId: string): Promise<void>
}SQLite 实现
typescript
// packages/agent-runtime/src/checkpoint-store.ts
import Database from 'better-sqlite3'
export class SqliteCheckpointStore<S> implements CheckpointStore<S> {
private db: Database.Database
constructor(dbPath = 'checkpoints.db') {
this.db = new Database(dbPath)
this.db.exec(`
CREATE TABLE IF NOT EXISTS checkpoints (
task_id TEXT PRIMARY KEY,
state TEXT NOT NULL,
current_node TEXT NOT NULL,
timestamp TEXT NOT NULL
)
`)
}
async save(taskId: string, checkpoint: Checkpoint<S>) {
this.db
.prepare(`
INSERT OR REPLACE INTO checkpoints (task_id, state, current_node, timestamp)
VALUES (?, ?, ?, ?)
`)
.run(taskId, JSON.stringify(checkpoint.state), checkpoint.currentNode, checkpoint.timestamp.toISOString())
}
async load(taskId: string): Promise<Checkpoint<S> | null> {
const row = this.db
.prepare('SELECT * FROM checkpoints WHERE task_id = ?')
.get(taskId) as any
if (!row) return null
return {
taskId,
state: JSON.parse(row.state) as S,
currentNode: row.current_node,
timestamp: new Date(row.timestamp),
}
}
async delete(taskId: string) {
this.db.prepare('DELETE FROM checkpoints WHERE task_id = ?').run(taskId)
}
}带 Checkpoint 的 StateGraph
在原有 StateGraph 基础上,每次 Node 执行后保存 Checkpoint:
typescript
// packages/agent-runtime/src/state-graph.ts(更新版)
export class StateGraph<S extends State> {
private checkpointStore?: CheckpointStore<S>
withCheckpoint(store: CheckpointStore<S>): this {
this.checkpointStore = store
return this
}
async invoke(initialState: S, startNode: string, taskId?: string): Promise<S> {
let state = { ...initialState }
let currentNode: string | '__end__' = startNode
// 如果有 taskId,尝试从 Checkpoint 恢复
if (taskId && this.checkpointStore) {
const checkpoint = await this.checkpointStore.load(taskId)
if (checkpoint) {
console.log(`[Graph] 从 Checkpoint 恢复任务 ${taskId},继续 Node: ${checkpoint.currentNode}`)
state = checkpoint.state
currentNode = checkpoint.currentNode
}
}
while (currentNode !== '__end__') {
const nodeFn = this.nodes.get(currentNode)
if (!nodeFn) throw new Error(`节点 "${currentNode}" 不存在`)
const update = await nodeFn(state)
state = this.reducer(state, update)
// 执行后保存 Checkpoint
if (taskId && this.checkpointStore) {
const edge = this.edges.get(currentNode)
const nextNode = typeof edge === 'string' ? edge : (edge?.(state) ?? '__end__')
await this.checkpointStore.save(taskId, {
taskId,
state,
currentNode: nextNode, // 保存"下一步要执行的 Node"
timestamp: new Date(),
})
}
const edge = this.edges.get(currentNode)
if (!edge) currentNode = '__end__'
else if (typeof edge === 'string') currentNode = edge
else currentNode = edge(state)
}
// 任务完成,清除 Checkpoint
if (taskId && this.checkpointStore) {
await this.checkpointStore.delete(taskId)
}
return state
}
}中断等待人工审批
在需要人工确认的 Node 后加一个"中断边":
typescript
// 定义一个特殊的中断状态
interface AgentState {
// ...
interrupted?: boolean
awaitingApproval?: string // 等待审批的操作描述
}
// 审批节点
graph.addNode('check-approval', async (state) => {
if (!state.approvalGranted) {
return {
interrupted: true,
awaitingApproval: `即将执行:${state.pendingAction},请确认`,
}
}
return { interrupted: false }
})
// 条件边:如果需要审批就停在这里,否则继续
graph.addEdge('check-approval', (state) =>
state.interrupted ? '__end__' : 'execute-action'
)
// 外部调用时:
// 1. 第一次执行,遇到 interrupted=true 停止,返回 awaitingApproval
// 2. 人工确认后,设置 approvalGranted=true,用同一个 taskId 再次调用
// 3. 从 Checkpoint 恢复,继续执行Tool Call 轨迹记录
记录 Agent 执行过程中的每次工具调用:
typescript
interface TraceEntry {
taskId: string
nodeId: string
type: 'node_start' | 'node_end' | 'tool_call' | 'tool_result'
data: unknown
timestamp: Date
}
// 在 StateGraph.invoke 里记录轨迹
async invoke(initialState: S, startNode: string, options?: InvokeOptions) {
const trace: TraceEntry[] = []
while (currentNode !== '__end__') {
trace.push({ nodeId: currentNode, type: 'node_start', data: state, ... })
const update = await nodeFn(state)
trace.push({ nodeId: currentNode, type: 'node_end', data: update, ... })
// ...
}
// 保存轨迹供调试
if (options?.traceStore) {
await options.traceStore.save(trace)
}
}本节产物
packages/agent-runtime/src/
checkpoint.ts # Checkpoint 接口定义
checkpoint-store.ts # SQLite 实现
state-graph.ts # 支持 Checkpoint 的 StateGraph(完整版)面试追问
Checkpoint 和 Savepoint 有什么区别?
概念上相似,都是在某个时间点保存状态以便恢复。区别在于语境:数据库用 Savepoint(事务回滚点),游戏用存档,LangGraph 叫 Checkpoint(Agent 状态快照)。LangGraph 的 Checkpoint 不支持回滚(历史状态只读),只支持从某个断点继续执行。
中断恢复时如何保证幂等性?
关键是 Checkpoint 保存的是"下一步要执行的 Node",而不是"刚刚执行完的 Node"。这样崩溃后恢复时,不会重复执行已完成的 Node。对于有副作用的操作(如发邮件),还需要在 Node 内部做幂等处理(检查是否已执行过)。