需求分析
产品需求:
- 用户自然语言提问 → Agent 自动处理
- 支持:查订单 / 查物流 / 退款 / 转人工四类意图
- 退款金额 ≤ ¥500 自动办理,> ¥500 需经理审批
- 前端 SSE 流式展示"思考过程"
- 会话可断可续(Postgres Checkpointer)
- 上 LangSmith 做 trace 和成本监控
整体架构
┌─── Frontend (React) ───┐
│ 聊天窗 + 审批弹窗 │
└──────────┬─────────────┘
│ SSE
┌──────────▼─────────────┐
│ FastAPI (uvicorn) │
│ /chat /resume /state│
└──────────┬─────────────┘
│
┌──────────▼─────────────┐ ┌─────────────┐
│ LangGraph App │◀────────▶│ Postgres │
│ classify → dispatch │ ckpt │ checkpoints │
│ → [search|refund|…] │ └─────────────┘
│ HITL interrupt │
└──────────┬─────────────┘
│
┌──────────▼─────────────┐
│ LangSmith Trace │
└────────────────────────┘
Step 1:定义 State
from typing import TypedDict, Annotated, Literal from langgraph.graph.message import add_messages class AgentState(TypedDict): messages: Annotated[list, add_messages] user_id: str intent: Literal["order", "shipping", "refund", "human", "other"] order_id: str | None refund_amount: float | None decision: str | None
Step 2:工具层
from langchain_core.tools import tool @tool def get_order(order_id: str) -> str: """查订单基本信息,返回 markdown。""" o = db.orders.get(order_id) if not o: return f"未找到订单 {order_id}" return f"订单 {o.id}\n金额: ¥{o.amount}\n状态: {o.status}\n下单: {o.created_at}" @tool def get_shipping(order_id: str) -> str: """查物流进度。""" trace = logistics.query(order_id) return "\n".join(f"{t.time} {t.city} {t.msg}" for t in trace) @tool def create_refund(order_id: str, amount: float, reason: str) -> str: """创建退款单。""" try: rid = refund_api.create(order_id, amount, reason, idem_key=f"{order_id}:{amount}") return f"退款单 {rid} 已创建" except ApiError as e: return f"创建失败: {e}"
Step 3:节点
from langchain_openai import ChatOpenAI from langgraph.types import interrupt, Command llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) # —— 1. 意图分类 —— def classify(s): rsp = llm.with_structured_output(IntentSchema).invoke([ ("system", "判断用户意图: order|shipping|refund|human|other,并提取 order_id/refund_amount"), *s["messages"], ]) return { "intent": rsp.intent, "order_id": rsp.order_id, "refund_amount": rsp.refund_amount, } # —— 2. 路由 —— def route(s) -> Literal["qa_agent", "refund_flow", "handoff"]: if s["intent"] == "refund": return "refund_flow" if s["intent"] == "human": return "handoff" return "qa_agent" # —— 3. QA agent(查订单/物流)—— from langgraph.prebuilt import create_react_agent qa_agent = create_react_agent( model="openai:gpt-4o-mini", tools=[get_order, get_shipping], prompt="你是客服,根据用户问题调用工具后简洁回复。", ) # —— 4. 退款流程 —— def check_amount(s) -> Literal["auto_refund", "approve_refund"]: return "auto_refund" if s["refund_amount"] <= 500 else "approve_refund" def approve_refund(s): decision = interrupt({ "type": "approve_refund", "order_id": s["order_id"], "amount": s["refund_amount"], "user_id": s["user_id"], }) return {"decision": decision} def auto_refund(s): return {"decision": "approve"} # 小额直接过 def do_refund(s): if s["decision"] != "approve": return {"messages": [AIMessage(content="退款申请已驳回。")]} result = create_refund.invoke({ "order_id": s["order_id"], "amount": s["refund_amount"], "reason": "客服审批通过", }) return {"messages": [AIMessage(content=result)]} # —— 5. 转人工 —— def handoff(s): ticket = ticket_sys.create(user_id=s["user_id"], history=s["messages"]) return {"messages": [AIMessage(content=f"已为您转接人工,工单号 {ticket.id}")]}
Step 4:拼图
from langgraph.graph import StateGraph, START, END from langgraph.pregel import RetryPolicy g = StateGraph(AgentState) g.add_node("classify", classify) g.add_node("qa_agent", qa_agent) g.add_node("refund_flow", lambda s: {}) # 空 hub 节点 g.add_node("auto_refund", auto_refund) g.add_node("approve_refund", approve_refund) g.add_node("do_refund", do_refund, retry=RetryPolicy(max_attempts=3)) g.add_node("handoff", handoff) g.add_edge(START, "classify") g.add_conditional_edges("classify", route) g.add_conditional_edges("refund_flow", check_amount) g.add_edge("auto_refund", "do_refund") g.add_edge("approve_refund", "do_refund") g.add_edge("do_refund", END) g.add_edge("qa_agent", END) g.add_edge("handoff", END)
Step 5:加 Checkpointer + 编译
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver DB = "postgresql://app:pwd@db/agent" async def build_app(): ckpt = await AsyncPostgresSaver.from_conn_string(DB).__aenter__() await ckpt.setup() return g.compile(checkpointer=ckpt)
Step 6:FastAPI 对外
from fastapi import FastAPI from fastapi.responses import StreamingResponse from pydantic import BaseModel import json api = FastAPI() APP = None @api.on_event("startup") async def startup(): global APP APP = await build_app() class ChatReq(BaseModel): thread_id: str user_id: str query: str @api.post("/chat") async def chat(req: ChatReq): cfg = {"configurable": {"thread_id": req.thread_id}, "tags": ["prod"], "metadata": {"user_id": req.user_id}} inp = {"messages": [("user", req.query)], "user_id": req.user_id} async def gen(): async for ev in APP.astream_events(inp, cfg, version="v2"): t = ev["event"] if t == "on_chat_model_stream": yield sse("token", {"c": ev["data"]["chunk"].content}) elif t == "on_tool_start": yield sse("tool_start", {"name": ev["name"]}) elif t == "on_tool_end": yield sse("tool_end", {"name": ev["name"], "out": str(ev["data"]["output"])[:300]}) snap = await APP.aget_state(cfg) if snap.tasks and any(t.interrupts for t in snap.tasks): payload = snap.tasks[0].interrupts[0].value yield sse("needs_human", payload) yield "data: [DONE]\n\n" return StreamingResponse(gen(), media_type="text/event-stream", headers={"X-Accel-Buffering": "no"}) def sse(t, data): return f"data: {json.dumps({'type':t, **data})}\n\n" @api.post("/resume") async def resume(thread_id: str, decision: str): from langgraph.types import Command cfg = {"configurable": {"thread_id": thread_id}} result = await APP.ainvoke(Command(resume=decision), cfg) return {"result": result}
Step 7:部署
Dockerfile
FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . ENV LANGCHAIN_TRACING_V2=true CMD ["uvicorn", "main:api", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
docker-compose.yml
services:
app:
build: .
ports: ["8000:8000"]
environment:
OPENAI_API_KEY: ${OPENAI_API_KEY}
LANGCHAIN_API_KEY: ${LANGCHAIN_API_KEY}
LANGCHAIN_PROJECT: customer-support-prod
DB_URL: postgresql://app:pwd@db:5432/agent
depends_on: [db]
db:
image: postgres:16
environment:
POSTGRES_USER: app
POSTGRES_PASSWORD: pwd
POSTGRES_DB: agent
volumes: ["pgdata:/var/lib/postgresql/data"]
volumes:
pgdata:
Nginx 反代(关键:关缓冲)
location /chat {
proxy_pass http://app:8000;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 10m;
chunked_transfer_encoding on;
}
Step 8:验收测试
def test_small_refund_auto(): inp = {"messages": [("user", "退 A1 订单,质量问题,200 块")], "user_id": "u1"} out = app.invoke(inp, cfg("t1")) assert "退款单" in out["messages"][-1].content assert not out.get("__interrupt__") def test_large_refund_needs_approval(): inp = {"messages": [("user", "退 A2,5000 块")], "user_id": "u2"} out = app.invoke(inp, cfg("t2")) assert out["__interrupt__"][0].value["type"] == "approve_refund" out2 = app.invoke(Command(resume="approve"), cfg("t2")) assert "退款单" in out2["messages"][-1].content def test_qa_flow(): out = app.invoke({"messages": [("user", "A1 物流到哪了?")], "user_id": "u3"}, cfg("t3")) assert "订单" in out["messages"][-1].content or "物流" in out["messages"][-1].content
上线清单
- ✅ Checkpointer 用 Postgres,不要 MemorySaver
- ✅ 关键节点(do_refund、外部 API)都加 retry
- ✅ 工具全部幂等(带 idempotency key)
- ✅ HITL 审批节点用 interrupt(),payload 含决策所需全部信息
- ✅ 前端 SSE 关 nginx 缓冲,加心跳
- ✅ LangSmith 打 tag/metadata,成本可追溯
- ✅ 单测覆盖 happy path + HITL 分支 + 转人工 + 退款驳回
- ✅ 定期清理 90 天前 checkpoint
- ✅ 成本告警:单会话 token > 10k 报警
- ✅ 降级预案:LLM 宕机直接转人工
扩展思路
- 多语言:state 加
lang,prompt_fn 按 lang 换 system message - 长期记忆:用
Store存用户偏好("这个客户以前投诉过 3 次") - 分层 Agent:复杂工单拆成 Supervisor + QA + Refund 三子 Agent(Ch8)
- 评估集:每周从生产抽 100 条跑 LangSmith eval,比较新版本
- A/B:
config["metadata"]["variant"] = "v2",LangSmith 按 variant 聚合
全书总结
- Ch1-2:LangGraph 是状态图,三件套 State/Node/Edge 就是全部
- Ch3:条件边 + 循环 + Command,让图会思考
- Ch4:ReAct 是 Agent 的标准范式,prebuilt 一行,需要时自己画
- Ch5:Checkpointer 是生产分水岭,thread_id 是会话边界
- Ch6:interrupt() 让 Agent 会"等人",审批/编辑/补充
- Ch7:流式是 UX 天花板,astream_events 最细粒度
- Ch8:多 Agent 按需上,Supervisor 最常见,子图是积木
- Ch9:LangSmith + OTel 是"白盒化"的钥匙
- Ch10:串起来就是生产级 Agent
最后一句话
LangGraph 本身不难——难的是把 LLM 的不确定性塞进一张确定的图里。 记住:图是骨,LLM 是肉,工具是手,人是头。每次 Agent 出问题,先问这四个里哪个没管好。
LangGraph 本身不难——难的是把 LLM 的不确定性塞进一张确定的图里。 记住:图是骨,LLM 是肉,工具是手,人是头。每次 Agent 出问题,先问这四个里哪个没管好。