项目概览
我们将构建一个"研究助手 Agent",它能够接收一个研究课题,自主完成:搜索最新资料、查询内部知识库、运行数据分析代码、综合生成结构化研究报告。
能力集合:Web 搜索(Tavily API)· 向量知识库检索(Chroma)· Python 代码执行(安全沙箱)· 报告生成与格式化
技术栈:LangGraph 0.2(主图编排)· LangChain 0.3(LLM/工具)· FastAPI(HTTP 接口)· SQLite 检查点(会话持久化)
特性:流式输出 · 多轮对话 · 成本控制 · LangSmith 追踪 · Docker 部署
系统架构
研究助手 Agent 完整架构:
┌─────────────────────────────────────────────────────────────┐
│ ResearchAgent StateGraph │
│ │
│ START → [plan_research] │
│ │ │
│ ▼ │
│ [execute_research] ←──────────────────────┐ │
│ │ │ │
│ ┌───────────┴──────────────┐ │ │
│ │ 工具执行节点 │ │ │
│ │ ┌─────────────────────┐ │ │ │
│ │ │ web_search │ │ │ │
│ │ │ knowledge_search │ │ │ │
│ │ │ execute_python │ │ │ │
│ │ │ save_note │ │ │ │
│ │ └─────────────────────┘ │ │ │
│ └───────────┬──────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ [evaluate_progress] ─── 继续研究 ───────────┘ │
│ │ │
│ 完成/超限 │
│ │ │
│ ▼ │
│ [generate_report] → END │
└─────────────────────────────────────────────────────────────┘
数据流:
用户输入 → State.topic
研究笔记 → State.research_notes (累积)
工具结果 → State.messages (累积)
最终报告 → State.final_report
完整代码实现
1. 状态定义与工具配置
# research_agent/state.py
from typing import TypedDict, Annotated, List, Optional
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
import operator
class ResearchState(TypedDict):
# 用户输入
topic: str
requirements: str # 报告要求(格式、重点等)
# 执行状态
messages: Annotated[List[BaseMessage], add_messages]
research_notes: Annotated[List[str], operator.add] # 累积研究笔记
iteration: Annotated[int, operator.add]
research_complete: bool
# 输出
final_report: Optional[str]
sources: Annotated[List[str], operator.add]
# research_agent/tools.py
from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
import subprocess, tempfile, os, json
from pydantic import BaseModel, Field
# ── Web 搜索工具 ──────────────────────────────────────────
_tavily = TavilySearchResults(max_results=5, include_raw_content=False)
@tool
def web_search(query: str) -> str:
"""搜索网络获取最新信息。适用于:最新新闻、统计数据、近期事件。"""
results = _tavily.invoke(query)
formatted = []
for r in results:
formatted.append(f"标题:{r['title']}\n链接:{r['url']}\n摘要:{r['content'][:400]}")
return "\n\n---\n\n".join(formatted)
# ── 知识库搜索工具 ────────────────────────────────────────
_embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
_vectorstore = Chroma(
collection_name="research_knowledge",
embedding_function=_embeddings,
persist_directory="./knowledge_db"
)
@tool
def knowledge_search(query: str) -> str:
"""搜索内部知识库。适用于:领域背景、技术细节、历史研究。"""
docs = _vectorstore.similarity_search(query, k=4)
if not docs:
return "知识库中未找到相关内容。"
return "\n\n".join(
f"[来源:{d.metadata.get('source','未知')}]\n{d.page_content}"
for d in docs
)
# ── Python 代码执行工具 ───────────────────────────────────
class ExecutePythonInput(BaseModel):
code: str = Field(description="要执行的Python代码,用于数据处理和分析")
description: str = Field(description="代码功能简述")
@tool("execute_python", args_schema=ExecutePythonInput)
def execute_python(code: str, description: str) -> str:
"""执行Python代码进行数据分析和计算。只能使用标准库和numpy/pandas。"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
# 安全预处理:移除 import os/sys/subprocess 等危险导入
safe_code = "\n".join(
line for line in code.splitlines()
if not any(danger in line
for danger in ["import os", "import sys",
"subprocess", "__import__",
"open(", "eval(", "exec("])
)
f.write(safe_code)
fname = f.name
try:
result = subprocess.run(
["python3", fname],
capture_output=True, text=True, timeout=30
)
output = result.stdout[-2000:] if result.stdout else ""
error = result.stderr[-500:] if result.stderr else ""
return f"输出:\n{output}" + (f"\n错误:{error}" if error else "")
finally:
os.unlink(fname)
# ── 笔记保存工具 ──────────────────────────────────────────
@tool
def save_research_note(note: str) -> str:
"""保存重要的研究发现到笔记本,供后续报告生成使用。"""
# 工具调用后,通过 State 的 research_notes 字段自动累积
return f"已保存笔记:{note[:50]}..."
RESEARCH_TOOLS = [web_search, knowledge_search, execute_python, save_research_note]
2. Agent 节点与图构建
# research_agent/graph.py
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
SYSTEM_PROMPT = """你是一位专业研究助手,擅长深度分析和综合信息。
研究方法:
1. 先用 web_search 获取最新信息
2. 用 knowledge_search 补充背景知识
3. 需要数据分析时使用 execute_python
4. 每找到重要发现,用 save_research_note 记录
5. 搜集足够信息后(通常3-5轮),直接输出最终报告
报告格式要求:
- 使用 Markdown 格式
- 包含执行摘要、详细分析、结论与建议
- 标注所有数据来源
- 客观、准确、有深度"""
def build_research_graph(db_path: str = "research.db"):
llm = ChatOpenAI(model="gpt-4o", temperature=0.2, streaming=True)
llm_with_tools = llm.bind_tools(RESEARCH_TOOLS)
def plan_research(state: ResearchState):
"""初始规划节点:分析课题,制定研究策略。"""
planning_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
plan = planning_llm.invoke([
SystemMessage(content="分析研究课题,列出关键问题和搜索策略(3-5条)"),
HumanMessage(content=
f"课题:{state['topic']}\n要求:{state.get('requirements', '全面、客观')}"
)
])
initial_msg = HumanMessage(content=
f"请研究:{state['topic']}\n"
f"要求:{state.get('requirements', '全面、客观、有深度')}\n"
f"研究计划参考:\n{plan.content}"
)
return {"messages": [initial_msg]}
def execute_research(state: ResearchState):
"""主研究节点:LLM 决定调用哪些工具。"""
messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response], "iteration": 1}
tool_node = ToolNode(RESEARCH_TOOLS, handle_tool_errors=True)
def evaluate_progress(state: ResearchState) -> str:
"""评估研究进度,决定是否继续或生成报告。"""
last_msg = state["messages"][-1]
# 强制终止条件
if state["iteration"] >= 12:
return "generate"
# LLM 完成研究(无工具调用 = 已有足够信息开始写报告)
if not last_msg.tool_calls:
return "generate"
# 继续使用工具
return "continue"
def generate_report(state: ResearchState):
"""报告生成节点:综合所有研究结果,输出结构化报告。"""
# 构建最终报告提示
notes_text = "\n\n".join(state.get("research_notes", []))
recent_msgs = state["messages"][-20:] # 取最近的对话历史
report_llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
report = report_llm.invoke([
SystemMessage(content="""你是专业报告撰写人。根据研究数据生成完整报告。
报告结构:
# 执行摘要(150字)
# 研究背景
# 核心发现(数据支撑)
# 深度分析
# 结论与建议
# 参考来源"""),
HumanMessage(content=
f"研究课题:{state['topic']}\n\n"
f"研究笔记:\n{notes_text}\n\n"
f"请生成完整报告。"
)
])
return {
"final_report": report.content,
"research_complete": True
}
# ── 构建图 ───────────────────────────────────────────────
graph_builder = StateGraph(ResearchState)
graph_builder.add_node("plan", plan_research)
graph_builder.add_node("research", execute_research)
graph_builder.add_node("tools", tool_node)
graph_builder.add_node("report", generate_report)
graph_builder.add_edge(START, "plan")
graph_builder.add_edge("plan", "research")
graph_builder.add_conditional_edges("research", evaluate_progress,
{"continue": "tools", "generate": "report"})
graph_builder.add_edge("tools", "research")
graph_builder.add_edge("report", END)
conn = sqlite3.connect("research.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
return graph_builder.compile(checkpointer=checkpointer)
3. FastAPI 接口层
# research_agent/api.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio, uuid, json
app = FastAPI(title="Research Agent API")
research_graph = build_research_graph()
class ResearchRequest(BaseModel):
topic: str
requirements: str = "全面、客观、有深度,使用数据支撑观点"
session_id: str = ""
@app.post("/research")
async def start_research(req: ResearchRequest):
"""启动研究任务,返回流式进度更新。"""
session_id = req.session_id or str(uuid.uuid4())
config = {"configurable": {"thread_id": session_id}}
async def generate_stream():
async for chunk in research_graph.astream(
{
"topic": req.topic,
"requirements": req.requirements,
"iteration": 0,
"research_notes": [],
"sources": [],
"research_complete": False,
"messages": [],
},
config=config,
stream_mode="updates"
):
for node, data in chunk.items():
event = {
"node": node,
"session_id": session_id,
}
if node == "report" and "final_report" in data:
event["type"] = "complete"
event["report"] = data["final_report"]
elif "messages" in data:
last = data["messages"][-1]
event["type"] = "progress"
event["message"] = last.content[:200]
else:
continue
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
return StreamingResponse(generate_stream(), media_type="text/event-stream")
@app.get("/research/{session_id}")
async def get_research_result(session_id: str):
"""获取已完成的研究报告。"""
config = {"configurable": {"thread_id": session_id}}
state = research_graph.get_state(config)
if not state or not state.values.get("final_report"):
raise HTTPException(404, "研究结果不存在或尚未完成")
return {
"session_id": session_id,
"topic": state.values["topic"],
"report": state.values["final_report"],
"iterations": state.values.get("iteration", 0),
}
4. 运行与测试
# 启动 API 服务
# uvicorn research_agent.api:app --reload --port 8000
# 调用示例(Python 客户端)
import httpx, json
async def research_client(topic: str):
async with httpx.AsyncClient(timeout=300) as client:
async with client.stream(
"POST", "http://localhost:8000/research",
json={"topic": topic, "requirements": "重点分析技术趋势和市场机会"}
) as resp:
async for line in resp.aiter_lines():
if line.startswith("data:"):
event = json.loads(line[6:])
if event["type"] == "progress":
print(f"[{event['node']}] {event['message']}")
elif event["type"] == "complete":
print("\n=== 研究报告 ===")
print(event["report"])
break
asyncio.run(research_client("2025年 AI Agent 框架发展趋势"))
项目扩展方向
本实战项目可以继续扩展:1) 添加 PDF 上传和解析(用于研究特定文档);2) 集成图表生成(matplotlib 代码执行 + 图片返回);3) 多用户隔离(每用户独立向量库);4) 研究计划的用户确认(Human-in-the-Loop);5) 报告导出 PDF(通过 LaTeX 或 Pandoc)。
课程总结:框架选型指南
决策树:选择合适的 Agent 框架
你的任务是什么?
│
├─ 代码生成/调试/数据分析 → AutoGen 0.4
│ (需要代码执行循环,自动验证结果)
│
├─ 多角色协作(写作/研究/内容生成)→ CrewAI 0.7
│ (明确的角色分工,任务有先后依赖)
│
├─ 复杂业务流程(条件分支多,状态复杂)→ LangGraph 0.2
│ (需要精确控制每一步,生产级别)
│
└─ 快速原型/简单对话 → LangChain AgentExecutor
(验证想法,不需要复杂状态管理)
组合原则:
- LangGraph 作为底层编排引擎(几乎适合所有场景)
- CrewAI 的角色设计理念可以在 LangGraph 中实现
- AutoGen 专注于代码生成场景难以替代
- 不要为了使用框架而使用框架:简单任务用 Chain!
恭喜完成全部课程!
你已经掌握了 AI Agent 开发的核心知识:从 ReAct 范式到 LangGraph 状态机,从工具调用到多 Agent 协作,从记忆系统到生产部署。Agent 工程是一个快速发展的领域,保持关注官方文档和社区动态,持续实践才是最好的学习方式。