Chapter 10

实战:研究助手 Agent

整合全程所学,构建一个真实可用的多步骤研究助手:Web 搜索 + 向量检索 + 代码执行 + 报告生成,完整的工作流实现。

项目概览

我们将构建一个"研究助手 Agent",它能够接收一个研究课题,自主完成:搜索最新资料、查询内部知识库、运行数据分析代码、综合生成结构化研究报告。

能力集合:Web 搜索(Tavily API)· 向量知识库检索(Chroma)· Python 代码执行(安全沙箱)· 报告生成与格式化

技术栈:LangGraph 0.2(主图编排)· LangChain 0.3(LLM/工具)· FastAPI(HTTP 接口)· SQLite 检查点(会话持久化)

特性:流式输出 · 多轮对话 · 成本控制 · LangSmith 追踪 · Docker 部署

系统架构

研究助手 Agent 完整架构: ┌─────────────────────────────────────────────────────────────┐ │ ResearchAgent StateGraph │ │ │ │ START → [plan_research] │ │ │ │ │ ▼ │ │ [execute_research] ←──────────────────────┐ │ │ │ │ │ │ ┌───────────┴──────────────┐ │ │ │ │ 工具执行节点 │ │ │ │ │ ┌─────────────────────┐ │ │ │ │ │ │ web_search │ │ │ │ │ │ │ knowledge_search │ │ │ │ │ │ │ execute_python │ │ │ │ │ │ │ save_note │ │ │ │ │ │ └─────────────────────┘ │ │ │ │ └───────────┬──────────────┘ │ │ │ │ │ │ │ ▼ │ │ │ [evaluate_progress] ─── 继续研究 ───────────┘ │ │ │ │ │ 完成/超限 │ │ │ │ │ ▼ │ │ [generate_report] → END │ └─────────────────────────────────────────────────────────────┘ 数据流: 用户输入 → State.topic 研究笔记 → State.research_notes (累积) 工具结果 → State.messages (累积) 最终报告 → State.final_report

完整代码实现

1. 状态定义与工具配置

# research_agent/state.py
from typing import TypedDict, Annotated, List, Optional
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
import operator

class ResearchState(TypedDict):
    # 用户输入
    topic: str
    requirements: str          # 报告要求(格式、重点等)

    # 执行状态
    messages: Annotated[List[BaseMessage], add_messages]
    research_notes: Annotated[List[str], operator.add]  # 累积研究笔记
    iteration: Annotated[int, operator.add]
    research_complete: bool

    # 输出
    final_report: Optional[str]
    sources: Annotated[List[str], operator.add]
# research_agent/tools.py
from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
import subprocess, tempfile, os, json
from pydantic import BaseModel, Field

# ── Web 搜索工具 ──────────────────────────────────────────
_tavily = TavilySearchResults(max_results=5, include_raw_content=False)

@tool
def web_search(query: str) -> str:
    """搜索网络获取最新信息。适用于:最新新闻、统计数据、近期事件。"""
    results = _tavily.invoke(query)
    formatted = []
    for r in results:
        formatted.append(f"标题:{r['title']}\n链接:{r['url']}\n摘要:{r['content'][:400]}")
    return "\n\n---\n\n".join(formatted)

# ── 知识库搜索工具 ────────────────────────────────────────
_embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
_vectorstore = Chroma(
    collection_name="research_knowledge",
    embedding_function=_embeddings,
    persist_directory="./knowledge_db"
)

@tool
def knowledge_search(query: str) -> str:
    """搜索内部知识库。适用于:领域背景、技术细节、历史研究。"""
    docs = _vectorstore.similarity_search(query, k=4)
    if not docs:
        return "知识库中未找到相关内容。"
    return "\n\n".join(
        f"[来源:{d.metadata.get('source','未知')}]\n{d.page_content}"
        for d in docs
    )

# ── Python 代码执行工具 ───────────────────────────────────
class ExecutePythonInput(BaseModel):
    code: str = Field(description="要执行的Python代码,用于数据处理和分析")
    description: str = Field(description="代码功能简述")

@tool("execute_python", args_schema=ExecutePythonInput)
def execute_python(code: str, description: str) -> str:
    """执行Python代码进行数据分析和计算。只能使用标准库和numpy/pandas。"""
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        # 安全预处理:移除 import os/sys/subprocess 等危险导入
        safe_code = "\n".join(
            line for line in code.splitlines()
            if not any(danger in line
                       for danger in ["import os", "import sys",
                                        "subprocess", "__import__",
                                        "open(", "eval(", "exec("])
        )
        f.write(safe_code)
        fname = f.name

    try:
        result = subprocess.run(
            ["python3", fname],
            capture_output=True, text=True, timeout=30
        )
        output = result.stdout[-2000:] if result.stdout else ""
        error  = result.stderr[-500:]  if result.stderr  else ""
        return f"输出:\n{output}" + (f"\n错误:{error}" if error else "")
    finally:
        os.unlink(fname)

# ── 笔记保存工具 ──────────────────────────────────────────
@tool
def save_research_note(note: str) -> str:
    """保存重要的研究发现到笔记本,供后续报告生成使用。"""
    # 工具调用后,通过 State 的 research_notes 字段自动累积
    return f"已保存笔记:{note[:50]}..."

RESEARCH_TOOLS = [web_search, knowledge_search, execute_python, save_research_note]

2. Agent 节点与图构建

# research_agent/graph.py
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

SYSTEM_PROMPT = """你是一位专业研究助手,擅长深度分析和综合信息。

研究方法:
1. 先用 web_search 获取最新信息
2. 用 knowledge_search 补充背景知识
3. 需要数据分析时使用 execute_python
4. 每找到重要发现,用 save_research_note 记录
5. 搜集足够信息后(通常3-5轮),直接输出最终报告

报告格式要求:
- 使用 Markdown 格式
- 包含执行摘要、详细分析、结论与建议
- 标注所有数据来源
- 客观、准确、有深度"""

def build_research_graph(db_path: str = "research.db"):
    llm = ChatOpenAI(model="gpt-4o", temperature=0.2, streaming=True)
    llm_with_tools = llm.bind_tools(RESEARCH_TOOLS)

    def plan_research(state: ResearchState):
        """初始规划节点:分析课题,制定研究策略。"""
        planning_llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
        plan = planning_llm.invoke([
            SystemMessage(content="分析研究课题,列出关键问题和搜索策略(3-5条)"),
            HumanMessage(content=
                f"课题:{state['topic']}\n要求:{state.get('requirements', '全面、客观')}"
            )
        ])
        initial_msg = HumanMessage(content=
            f"请研究:{state['topic']}\n"
            f"要求:{state.get('requirements', '全面、客观、有深度')}\n"
            f"研究计划参考:\n{plan.content}"
        )
        return {"messages": [initial_msg]}

    def execute_research(state: ResearchState):
        """主研究节点:LLM 决定调用哪些工具。"""
        messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
        response = llm_with_tools.invoke(messages)
        return {"messages": [response], "iteration": 1}

    tool_node = ToolNode(RESEARCH_TOOLS, handle_tool_errors=True)

    def evaluate_progress(state: ResearchState) -> str:
        """评估研究进度,决定是否继续或生成报告。"""
        last_msg = state["messages"][-1]

        # 强制终止条件
        if state["iteration"] >= 12:
            return "generate"

        # LLM 完成研究(无工具调用 = 已有足够信息开始写报告)
        if not last_msg.tool_calls:
            return "generate"

        # 继续使用工具
        return "continue"

    def generate_report(state: ResearchState):
        """报告生成节点:综合所有研究结果,输出结构化报告。"""
        # 构建最终报告提示
        notes_text = "\n\n".join(state.get("research_notes", []))
        recent_msgs = state["messages"][-20:]  # 取最近的对话历史

        report_llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
        report = report_llm.invoke([
            SystemMessage(content="""你是专业报告撰写人。根据研究数据生成完整报告。
            报告结构:
            # 执行摘要(150字)
            # 研究背景
            # 核心发现(数据支撑)
            # 深度分析
            # 结论与建议
            # 参考来源"""),
            HumanMessage(content=
                f"研究课题:{state['topic']}\n\n"
                f"研究笔记:\n{notes_text}\n\n"
                f"请生成完整报告。"
            )
        ])
        return {
            "final_report": report.content,
            "research_complete": True
        }

    # ── 构建图 ───────────────────────────────────────────────
    graph_builder = StateGraph(ResearchState)
    graph_builder.add_node("plan",     plan_research)
    graph_builder.add_node("research", execute_research)
    graph_builder.add_node("tools",    tool_node)
    graph_builder.add_node("report",   generate_report)

    graph_builder.add_edge(START, "plan")
    graph_builder.add_edge("plan", "research")
    graph_builder.add_conditional_edges("research", evaluate_progress,
        {"continue": "tools", "generate": "report"})
    graph_builder.add_edge("tools", "research")
    graph_builder.add_edge("report", END)

    conn = sqlite3.connect("research.db", check_same_thread=False)
    checkpointer = SqliteSaver(conn)
    return graph_builder.compile(checkpointer=checkpointer)

3. FastAPI 接口层

# research_agent/api.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio, uuid, json

app = FastAPI(title="Research Agent API")
research_graph = build_research_graph()

class ResearchRequest(BaseModel):
    topic: str
    requirements: str = "全面、客观、有深度,使用数据支撑观点"
    session_id: str = ""

@app.post("/research")
async def start_research(req: ResearchRequest):
    """启动研究任务,返回流式进度更新。"""
    session_id = req.session_id or str(uuid.uuid4())
    config = {"configurable": {"thread_id": session_id}}

    async def generate_stream():
        async for chunk in research_graph.astream(
            {
                "topic": req.topic,
                "requirements": req.requirements,
                "iteration": 0,
                "research_notes": [],
                "sources": [],
                "research_complete": False,
                "messages": [],
            },
            config=config,
            stream_mode="updates"
        ):
            for node, data in chunk.items():
                event = {
                    "node": node,
                    "session_id": session_id,
                }
                if node == "report" and "final_report" in data:
                    event["type"] = "complete"
                    event["report"] = data["final_report"]
                elif "messages" in data:
                    last = data["messages"][-1]
                    event["type"] = "progress"
                    event["message"] = last.content[:200]
                else:
                    continue
                yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"

    return StreamingResponse(generate_stream(), media_type="text/event-stream")

@app.get("/research/{session_id}")
async def get_research_result(session_id: str):
    """获取已完成的研究报告。"""
    config = {"configurable": {"thread_id": session_id}}
    state = research_graph.get_state(config)
    if not state or not state.values.get("final_report"):
        raise HTTPException(404, "研究结果不存在或尚未完成")
    return {
        "session_id": session_id,
        "topic": state.values["topic"],
        "report": state.values["final_report"],
        "iterations": state.values.get("iteration", 0),
    }

4. 运行与测试

# 启动 API 服务
# uvicorn research_agent.api:app --reload --port 8000

# 调用示例(Python 客户端)
import httpx, json

async def research_client(topic: str):
    async with httpx.AsyncClient(timeout=300) as client:
        async with client.stream(
            "POST", "http://localhost:8000/research",
            json={"topic": topic, "requirements": "重点分析技术趋势和市场机会"}
        ) as resp:
            async for line in resp.aiter_lines():
                if line.startswith("data:"):
                    event = json.loads(line[6:])
                    if event["type"] == "progress":
                        print(f"[{event['node']}] {event['message']}")
                    elif event["type"] == "complete":
                        print("\n=== 研究报告 ===")
                        print(event["report"])
                        break

asyncio.run(research_client("2025年 AI Agent 框架发展趋势"))
项目扩展方向 本实战项目可以继续扩展:1) 添加 PDF 上传和解析(用于研究特定文档);2) 集成图表生成(matplotlib 代码执行 + 图片返回);3) 多用户隔离(每用户独立向量库);4) 研究计划的用户确认(Human-in-the-Loop);5) 报告导出 PDF(通过 LaTeX 或 Pandoc)。

课程总结:框架选型指南

决策树:选择合适的 Agent 框架 你的任务是什么? │ ├─ 代码生成/调试/数据分析 → AutoGen 0.4 │ (需要代码执行循环,自动验证结果) │ ├─ 多角色协作(写作/研究/内容生成)→ CrewAI 0.7 │ (明确的角色分工,任务有先后依赖) │ ├─ 复杂业务流程(条件分支多,状态复杂)→ LangGraph 0.2 │ (需要精确控制每一步,生产级别) │ └─ 快速原型/简单对话 → LangChain AgentExecutor (验证想法,不需要复杂状态管理) 组合原则: - LangGraph 作为底层编排引擎(几乎适合所有场景) - CrewAI 的角色设计理念可以在 LangGraph 中实现 - AutoGen 专注于代码生成场景难以替代 - 不要为了使用框架而使用框架:简单任务用 Chain!
恭喜完成全部课程! 你已经掌握了 AI Agent 开发的核心知识:从 ReAct 范式到 LangGraph 状态机,从工具调用到多 Agent 协作,从记忆系统到生产部署。Agent 工程是一个快速发展的领域,保持关注官方文档和社区动态,持续实践才是最好的学习方式。