Chapter 07

Graph · 复杂 Agent 的状态机骨架

一个 Agent 解决不了所有问题——多步工作流、人在环路、分支合流,这些场景需要状态机pydantic_graph 是 Pydantic AI 内置的极简状态机框架。

一、什么时候才需要 Graph?

老实讲,绝大多数场景用不到 Graph。一个 Agent + 几个工具 + 一个 output_type 就能解决。Graph 只在以下场景才有优势:

判别思路:如果你能把事情画成一张"有命名节点 + 带方向箭头"的图,并且图不止"线性一条龙",那就是 Graph 的场景。否则单 Agent 就够。

二、pydantic_graph 的三个核心概念

BaseNode
每个节点是一个继承 BaseNode——它的 run 方法是这个节点的逻辑,返回值是下一个节点(或 End)。节点即类型,边即"返回了谁"。
End
表示图的终止,携带最终结果。End[OutputType] 的类型参数就是图的返回类型。
Graph
整个图对象。构造时传入所有节点类,运行时 graph.run(start_node, state=..., deps=...)

三、Hello Graph:一个两节点的小图

from __future__ import annotations
from dataclasses import dataclass, field
from pydantic_graph import BaseNode, End, Graph, GraphRunContext

@dataclass
class GraphState:
    count: int = 0

@dataclass
class Increment(BaseNode[GraphState]):
    step: int = 1

    async def run(self, ctx: GraphRunContext[GraphState]) -> Increment | End[int]:
        ctx.state.count += self.step
        if ctx.state.count >= 5:
            return End(ctx.state.count)
        return Increment(step=self.step)

graph = Graph(nodes=[Increment])

result = graph.run_sync(Increment(step=1), state=GraphState())
print(result.output)     # 5
print(result.state)      # GraphState(count=5)

理解这段代码:

四、多节点 + 分支:带判断的图

把一个 Agent 流程画成图——模型先判断"用户想干什么",再路由到不同处理节点:

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Literal
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_graph import BaseNode, End, Graph, GraphRunContext

# ─── 状态与依赖 ───
@dataclass
class State:
    user_input: str = ""
    intent: str = ""
    answer: str = ""

# ─── 分类器 Agent ───
class IntentResult(BaseModel):
    intent: Literal["chat", "search", "book"]

classifier = Agent("openai:gpt-4o-mini", output_type=IntentResult,
    system_prompt="判断用户意图:闲聊=chat、查资料=search、预订=book。")

# ─── 节点 ───
@dataclass
class Classify(BaseNode[State]):
    async def run(self, ctx: GraphRunContext[State]) -> Chat | Search | Book:
        r = await classifier.run(ctx.state.user_input)
        ctx.state.intent = r.output.intent
        if r.output.intent == "chat":
            return Chat()
        if r.output.intent == "search":
            return Search()
        return Book()

@dataclass
class Chat(BaseNode[State]):
    async def run(self, ctx: GraphRunContext[State]) -> End[str]:
        a = await chatter.run(ctx.state.user_input)
        ctx.state.answer = a.output
        return End(a.output)

@dataclass
class Search(BaseNode[State]):
    async def run(self, ctx: GraphRunContext[State]) -> End[str]:
        a = await searcher.run(ctx.state.user_input)
        ctx.state.answer = a.output
        return End(a.output)

@dataclass
class Book(BaseNode[State]):
    async def run(self, ctx: GraphRunContext[State]) -> End[str]:
        a = await booker.run(ctx.state.user_input)
        ctx.state.answer = a.output
        return End(a.output)

# ─── 组图 ───
graph = Graph(nodes=[Classify, Chat, Search, Book])

async def handle(question: str):
    state = State(user_input=question)
    result = await graph.run(Classify(), state=state)
    print("意图:", state.intent, "答案:", result.output)

流程:Classify 节点调用分类 Agent,根据意图返回 Chat / Search / Book 三个节点之一;这三个节点再各自调用专属 Agent 得到答案,最后 End(answer)

为什么这比直接嵌套 if/await 好?

五、可视化:一键画图

print(graph.mermaid_code(start_node=Classify))
stateDiagram-v2
  [*] --> Classify
  Classify --> Chat
  Classify --> Search
  Classify --> Book
  Chat --> [*]
  Search --> [*]
  Book --> [*]

把这段贴到 mermaid.live 就能看到图。评审代码时把流程图塞进 PR 描述——架构讨论变成"图"而不是"猜"。

六、State 和 Deps 的分工

Graph 有两个运行时"容器",别混了:

容器性质谁读谁写放什么
state图生命周期共享,可变所有节点读写流程中累积的数据(意图、中间结果、步数)
deps图运行时依赖,不可变引用所有节点只读外部依赖(DB、HTTP client、配置)
@dataclass
class State:
    count: int = 0

@dataclass
class Deps:
    db: object
    http: object

@dataclass
class MyNode(BaseNode[State, Deps]):   # 泛型两个参数:State, Deps
    async def run(self, ctx: GraphRunContext[State, Deps]) -> End[int]:
        rows = await ctx.deps.db.fetch("...")
        ctx.state.count += len(rows)
        return End(ctx.state.count)

graph = Graph(nodes=[MyNode])
result = await graph.run(MyNode(), state=State(), deps=Deps(db=real_db, http=real_http))

七、持久化:长任务的断点续跑

Graph 的一大杀手锏:BaseStatePersistence。它在每次节点运行前后做 snapshot,失败后可以从中断处恢复。

from pydantic_graph.persistence.file import FileStatePersistence
from pathlib import Path

persistence = FileStatePersistence(Path(f"/tmp/graph/{run_id}.json"))

async with graph.iter(Classify(), state=state, persistence=persistence) as run:
    async for node in run:
        print("跑完节点:", node)  # 每个节点执行完都 snapshot 到磁盘
    result = run.result

进程挂了?新进程用相同 run_id 恢复:

async with graph.iter_from_persistence(persistence) as run:
    async for node in run:
        print("续跑节点:", node)

自带的实现:

八、人在环路(Human-in-the-Loop)

流程跑到某节点需要人工批准才继续?思路是:节点返回一个特殊的 End,把 state 存住;人工批完后从下一个节点接着跑。

@dataclass
class AwaitApproval(BaseNode[State]):
    draft: str
    async def run(self, ctx):
        ctx.state.draft = self.draft
        return End({"status": "needs_approval", "draft": self.draft})

@dataclass
class Publish(BaseNode[State]):
    async def run(self, ctx):
        # 发布 ctx.state.draft
        return End({"status": "published"})

graph = Graph(nodes=[GenerateDraft, AwaitApproval, Publish])

# 第一次运行到 AwaitApproval 结束
result1 = await graph.run(GenerateDraft(), state=state, persistence=persistence)
# 返回给前端:{"status": "needs_approval", "draft": "..."}

# 人工审完点批准,新请求继续:
result2 = await graph.run(Publish(), state=state, persistence=persistence)

九、和 LangGraph 的同异

LangGraph

  • 节点是函数,边是字符串或 lambda
  • 基于 StateGraph 加 edge/conditional_edge 配置
  • 生态更大,内建 checkpoint/HITL 能力多
  • 和 LangChain 深度绑定

pydantic_graph

  • 节点是类,边是"下个节点的类型"
  • 没有单独 edge/cond_edge——全在 return 类型里
  • 代码更薄,学习曲线更短,mypy 检查更严
  • 独立子包,可以不跟 Pydantic AI 一起用

选型建议:

十、实战:客服分流 Agent

一个中等复杂度的真实场景——客服 bot 按流程处理:

  1. 识别用户诉求(售前/售后/投诉)
  2. 售前/售后走对应 Agent
  3. 投诉升级到人工队列,持久化等人工接手
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_graph import BaseNode, End, Graph, GraphRunContext

@dataclass
class CsState:
    user_msg: str = ""
    category: str = ""
    handled_by: str = ""
    reply: str = ""

class Category(BaseModel):
    kind: Literal["presale", "aftersale", "complaint"]

classifier = Agent("openai:gpt-4o-mini", output_type=Category,
    system_prompt="判断消息类别:售前咨询=presale、售后问题=aftersale、投诉=complaint。")
presale = Agent("openai:gpt-4o-mini", system_prompt="你是售前助手,推销产品特性、解答购买疑问。")
after = Agent("openai:gpt-4o-mini", system_prompt="你是售后助手,处理退货换货、物流查询。")

@dataclass
class Route(BaseNode[CsState]):
    async def run(self, ctx) -> HandlePresale | HandleAfter | EscalateHuman:
        r = await classifier.run(ctx.state.user_msg)
        ctx.state.category = r.output.kind
        if r.output.kind == "presale": return HandlePresale()
        if r.output.kind == "aftersale": return HandleAfter()
        return EscalateHuman()

@dataclass
class HandlePresale(BaseNode[CsState]):
    async def run(self, ctx) -> End[str]:
        r = await presale.run(ctx.state.user_msg)
        ctx.state.reply = r.output
        ctx.state.handled_by = "presale-agent"
        return End(r.output)

@dataclass
class HandleAfter(BaseNode[CsState]):
    async def run(self, ctx) -> End[str]:
        r = await after.run(ctx.state.user_msg)
        ctx.state.reply = r.output
        ctx.state.handled_by = "after-agent"
        return End(r.output)

@dataclass
class EscalateHuman(BaseNode[CsState]):
    async def run(self, ctx) -> End[str]:
        ctx.state.handled_by = "human-queue"
        ctx.state.reply = "已升级到人工,请稍候,工号 12345 将尽快回复。"
        return End(ctx.state.reply)

graph = Graph(nodes=[Route, HandlePresale, HandleAfter, EscalateHuman])

之后每次用户发消息,跑一次 graph.run(Route(), state=CsState(user_msg=...))。图把"路由 + 处理"解耦,逻辑一清二白。

十一、八个常见坑

  1. 忘记 from __future__ import annotations:类互相引用(Chat | Search)会引发前向声明问题。Python 3.11 以下必须加。
  2. State 写太大:每次 checkpoint 都要序列化 State——里面放几 MB 的数据会慢哭。大数据存外部,State 里只放引用/ID。
  3. 用 mutable default 值:field(default_factory=list),不要 = []
  4. 节点直接 return Agent 的结果对象:结果对象不是 BaseNodeEnd——框架会报错。要返回节点或 End 包装。
  5. 忘记把所有节点注册进 Graph(nodes=[...]):运行时抛"未知节点"。
  6. 循环无终止条件:节点总是 return 自己,无限循环。默认 Graph 有最大深度保护,建议自己加显式计数。
  7. deps 里放 Agent:循环引用,序列化/打印时可能栈溢出。Agent 应是模块级全局变量,不进 deps。
  8. 直接在节点 runprint 长日志:checkpoint 的每一步被阻塞。用 logfire.span 做结构化 trace。

十二、本章小结

三条心法:
① Graph 适合多节点 + 条件分支 + 持久化 / HITL 的场景,简单 Agent 用不着。
② 节点是类,返回什么就走到什么——边就是 return 类型签名。
③ State(可变共享状态)+ Deps(不可变外部依赖),职责明确分开。