为什么必须要 Checkpointer
没有 Checkpointer 的 Agent 是一次性的:invoke 一次、state 活在内存、结束即毁。真实产品里会遇到:
- 用户关了浏览器,明天打开继续对话 → 要恢复历史
- Agent 跑到第 4 步崩了 → 从第 4 步重试,不是从头
- 用户说"上一步别那样,重来" → 要回到上一步
- 想调试"如果这一步改成 X 会怎样" → 要能分支
这些都是 Checkpointer 提供的能力。
三件套:Checkpointer + thread_id + config
from langgraph.checkpoint.memory import MemorySaver checkpointer = MemorySaver() # 内存版(进程重启就没) app = g.compile(checkpointer=checkpointer) config = {"configurable": {"thread_id": "user-42-session-1"}} # 第 1 次调用 app.invoke({"messages": [("user", "我叫小明")]}, config=config) # 第 2 次调用 — 同一个 thread_id,自动接着聊 app.invoke({"messages": [("user", "我叫什么?")]}, config=config) # → "你叫小明" ✓
thread_id 就是"会话 ID"
同一个 thread_id 共享 state。不同用户、不同会话用不同 thread_id 隔离。可以是 UUID、也可以是业务 ID 如 "u42:s7"。
三种 Checkpointer 对比
| 实现 | 特点 | 适用 |
|---|---|---|
MemorySaver | 进程内 dict,零依赖 | 单测、开发、短会话 |
SqliteSaver | 本地 SQLite 文件 | 个人工具、CLI、小规模 |
PostgresSaver | 生产级,支持高并发 | 线上服务、多副本 |
SQLite 版
from langgraph.checkpoint.sqlite import SqliteSaver with SqliteSaver.from_conn_string("checkpoints.sqlite") as checkpointer: app = g.compile(checkpointer=checkpointer) app.invoke(...)
Postgres 版(异步推荐)
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver DB_URL = "postgresql://user:pwd@localhost:5432/langgraph" async with AsyncPostgresSaver.from_conn_string(DB_URL) as checkpointer: await checkpointer.setup() # 首次建表 app = g.compile(checkpointer=checkpointer) result = await app.ainvoke({"messages": [...]}, config=config)
查状态:get_state / get_state_history
# 当前状态 snap = app.get_state(config) print(snap.values) # {"messages": [...]} print(snap.next) # 下一步要跑的节点(中断时有值) print(snap.config) # 带 checkpoint_id # 历史快照(最新在前) for snap in app.get_state_history(config): print(snap.metadata["step"], snap.values["messages"][-1].content[:50])
回溯:从某个 checkpoint 重跑
每个 snapshot 有唯一的 checkpoint_id。用它做"时光倒流":
# 找到想回的那一步 history = list(app.get_state_history(config)) target = history[2] # 比如倒数第 3 步 # 用它的 config 继续跑 — 走新的分支 new_config = target.config app.invoke(None, config=new_config) # None = 不加输入,从快照继续
手动改 state:update_state
可以在不跑任何节点的情况下修改 state。适合"用户说上一条回答错了,我帮你改":
app.update_state( config, {"messages": [AIMessage(content="改过的回答", id="m7")]}, as_node="llm", # 装作这个更新是 llm 节点产的 ) # 下一次 invoke 会从这个新 state 继续
多租户:thread_id 的命名策略
生产里 thread_id 是会话隔离的唯一手段。常见策略:
# 简单: 用户 ID + 会话自增 thread_id = f"{user_id}:{session_n}" # 多项目: 加上应用标识,不同应用共享 DB thread_id = f"{app_name}:{user_id}:{session_id}" # 业务语义: 挂在具体对象上 thread_id = f"ticket:{ticket_id}"
thread_id 一定要唯一且稳定
同一个会话在 invoke 多次之间必须用同一个 ID,否则会"失忆"。
不同会话别共用 ID,否则互相污染。
checkpoint 的生命周期
Checkpointer 不会自己清理历史。生产中要定期清理过期 thread:
# 清掉一整个 thread 的所有 checkpoint app.checkpointer.delete_thread(thread_id="user-42-session-1") # 或者直接 DB 侧按时间清理(Postgres) # DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '30 days'
跨节点共享:store(长期记忆)
Checkpoint = 单会话的状态;store = 跨会话的长期记忆(用户档案、偏好):
from langgraph.store.memory import InMemoryStore store = InMemoryStore() app = g.compile(checkpointer=checkpointer, store=store) def save_profile(state, config, *, store): user_id = config["configurable"]["user_id"] store.put(("profiles", user_id), "last_q", {"q": state["query"]}) def load_profile(state, config, *, store): user_id = config["configurable"]["user_id"] prof = store.get(("profiles", user_id), "last_q") return {"prior_q": prof.value["q"] if prof else None}
实战:对话历史太长怎么办
长会话里 messages 可能涨到成百上千条,既贵又慢。两种策略:
① 滑动窗口 + 摘要
def truncate(s): msgs = s["messages"] if len(msgs) > 20: # 摘要前半段,保留后 10 条 summary = summarize(msgs[:-10]) return {"messages": [RemoveMessage(id=m.id) for m in msgs[:-10]] + [SystemMessage(content=f"之前对话摘要: {summary}")]} return {}
② RemoveMessage:add_messages 的删除支持
from langchain_core.messages import RemoveMessage def clean(s): return {"messages": [RemoveMessage(id=m.id) for m in s["messages"][:-5]]}
常见坑
| 坑 | 症状 | 解法 |
|---|---|---|
| 忘传 config | "thread_id required" | 每次 invoke 都带 config |
| 换了 thread_id 却期望记住 | Agent 失忆 | 同会话保持 ID 一致 |
| MemorySaver 上线 | 重启就丢 | 生产用 Postgres/SQLite |
| checkpoint 爆表 | DB 占用飞涨 | 定期 delete_thread / 按时间 GC |
| async checkpointer 被同步调用 | 协程未 await | app 用 ainvoke/astream |
本章小结
- Checkpointer = Agent 的"存档",靠 thread_id 隔离
- 开发用 MemorySaver,生产用 Postgres(支持并发 + 持久)
- 时光机三件套:
get_state/get_state_history/update_state - 长期记忆用
Store,和 checkpoint 互补 - 长对话要有 RemoveMessage / 摘要策略,别无限累积