Chapter 06

流式与消息历史

聊天体验的核心是即时反馈 + 记忆上下文。这一章讲 Pydantic AI 的两个对应机制:run_stream 吐流,message_history 拼历史。

一、为什么要流式?

LLM 生成一段 500 字的回答,端到端可能要 5 秒。如果你等它全部生成完再一次性返回——前端白屏 5 秒,用户以为崩了。

流式的价值是首字节时间(TTFB)从几秒降到亚秒:

非流式: [send]───────────────────────────────[resp 5s 后] 流式: [send]──[token 1 @ 0.3s]──[token 2 @ 0.35s]──... 持续到结束

用户看到"正在打字"的感觉立刻出现,即使总时长没变,体感延迟大幅下降。这也是 ChatGPT 之所以火的交互设计精髓。

二、run_stream 最小示例

import asyncio
from pydantic_ai import Agent

agent = Agent("openai:gpt-4o-mini")

async def main():
    async with agent.run_stream("给我写一段 200 字关于唐诗的介绍") as response:
        async for chunk in response.stream_text(delta=True):
            print(chunk, end="", flush=True)
    print()
    print("用量:", response.usage())

asyncio.run(main())

几个关键点:

三、delta=True vs delta=False

模式每次 yield 的内容典型用法
delta=True自上次以来的新增字符串前端 SSE / 控制台打字机效果
delta=False(默认)到目前为止的全部字符串需要每次拿完整文本重渲染的场景
SSE / WebSocket 推给前端时一般用 delta=True——让前端只 append,不要重建 DOM。而如果要每 chunk 都做一次 markdown 重渲染,用 delta=False 更省心。

四、结构化流:边出边校验

能流式的不只是文本。Pydantic AI 也能流式吐结构化输出——每一个 chunk 都试图被解析成 output_type 的"部分值":

from pydantic import BaseModel

class Recipe(BaseModel):
    name: str
    ingredients: list[str]
    steps: list[str]

agent = Agent("openai:gpt-4o", output_type=Recipe)

async with agent.run_stream("给我一份麻婆豆腐的做法") as response:
    async for partial in response.stream_structured(debounce_by=0.05):
        print("名字:", partial.output.name)
        print("已出 ingredients:", partial.output.ingredients)
        print("---")

你会看到 name 先出现,然后 ingredients 一个一个被追加,最后 steps 填满。每个中间 partial 都是一个"到目前为止尽量校验过"的 Recipe——必填字段如果还没出,可能是空字符串或空列表。

debounce_by=0.05 是防抖:不管底层流多快,至少间隔 50ms 才 yield 一次,避免前端消息过多。

结构化流的实战价值

想象一个"AI 简历解析器"——用户上传一段自我介绍,你用结构化 Agent 拆成 Resume 模型。传统做法前端要等几秒才出完整表单;结构化流的做法是:姓名先填上,工作经历一条一条加上来——和 ChatGPT 用户体验完全对齐。

五、提前结束 / 取消流

用户点了"停止生成"按钮?直接 break:

async with agent.run_stream("...") as response:
    async for chunk in response.stream_text(delta=True):
        print(chunk, end="")
        if user_clicked_stop:
            break
# 离开 async with 后,底层 HTTP 连接自动关闭

已经消费的 token 照样记账——LLM 那边还是跑完了的,但你不再接收。这是 provider 的协议规定,所有流式框架一致。

六、流式 + FastAPI:标准 SSE 响应

把 Pydantic AI 的流接到 FastAPI 的 StreamingResponse:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat")
async def chat(question: str):
    async def sse_gen():
        async with agent.run_stream(question) as response:
            async for chunk in response.stream_text(delta=True):
                yield f"data: {chunk}\n\n"
        yield "event: done\ndata: \n\n"

    return StreamingResponse(sse_gen(), media_type="text/event-stream")

前端用 EventSource 消费:

const es = new EventSource('/chat?question=...');
es.onmessage = (e) => { output.innerText += e.data; };
es.addEventListener('done', () => es.close());

七、消息历史:多轮对话的核心

LLM 本身是无状态的——每次调用都是独立的。要做多轮对话,必须手动把"前几轮"传回去。Pydantic AI 的做法:

agent = Agent("openai:gpt-4o-mini", system_prompt="你是私人助手,记住用户的信息。")

# 第一轮
r1 = await agent.run("我叫小王,今年 28。")
print(r1.output)
# "你好小王,我记住啦。"

# 第二轮——把第一轮的消息传进去
r2 = await agent.run(
    "我多大了?",
    message_history=r1.all_messages(),
)
print(r2.output)
# "你 28 岁。"

# 第三轮
r3 = await agent.run(
    "那明年呢?",
    message_history=r2.all_messages(),
)
print(r3.output)
# "明年你就 29 岁。"

核心操作就一步:run(..., message_history=上一次的 all_messages())

all_messages() vs new_messages()

result.all_messages()
本次 run 的全部消息,包括传入的 message_history。下一轮对话直接用这个。
result.new_messages()
本次 run 新增的消息(不含 message_history)。适合做增量持久化——你已经存了之前的,只需把新增 append 进去。

八、持久化消息历史:Redis/DB 示例

Pydantic AI 提供了消息的 JSON 序列化帮手,可以直接落库:

from pydantic_ai.messages import ModelMessagesTypeAdapter
import json

# 序列化
msgs = r1.all_messages()
raw = ModelMessagesTypeAdapter.dump_json(msgs).decode()
await redis.set(f"conv:{session_id}", raw)

# 反序列化
raw = await redis.get(f"conv:{session_id}")
history = ModelMessagesTypeAdapter.validate_json(raw)
r2 = await agent.run("...", message_history=history)

这是聊天机器人最核心的状态管理模式——每个会话一个 Redis key,增量 append,超过 N 轮就摘要后压缩

九、跨 Agent 共享历史

Agent A 和 Agent B 是两个不同 Agent(不同 prompt、不同工具),但都属于同一个对话,历史要延续——message_history 就是答案:

analyst = Agent("openai:gpt-4o", system_prompt="你是数据分析师")
writer = Agent("openai:gpt-4o", system_prompt="你是文案作家")

r1 = await analyst.run("分析一下这组数据...")
r2 = await writer.run("基于分析结果写段公众号文案", message_history=r1.new_messages())
跨 Agent 时,system_prompt 会重新注入 当你把 A 的 new_messages 传给 B,B 自己的 system_prompt 会作为新的 system 消息附加进去——老的 system prompt 不会被删。为避免两个 system prompt 互相干扰,推荐用 new_messages() 而不是 all_messages()

十、截断与窗口:历史长到爆炸怎么办

多轮对话 30 轮以后,消息列表可能几千个 token——又贵又慢。两种常见策略:

策略 A:固定窗口

只保留最近 N 条消息:

def trim(msgs, keep=20):
    if len(msgs) <= keep:
        return msgs
    # 保留第一条(通常是 system)和最后 keep-1 条
    return [msgs[0]] + msgs[-(keep - 1):]

trimmed = trim(history)
r = await agent.run("...", message_history=trimmed)

策略 B:摘要压缩

到达阈值时,让一个"摘要 Agent"把前若干轮压成一段 system prompt,历史从头清空:

summarizer = Agent(
    "openai:gpt-4o-mini",
    system_prompt="你是会议记录员,把对话浓缩成 100 字关键事实列表。",
    output_type=str,
)

async def compact(history, threshold=30):
    if len(history) < threshold:
        return history
    # 取前 80% 让摘要 Agent 压缩
    old = history[:int(len(history) * 0.8)]
    recent = history[int(len(history) * 0.8):]
    s = await summarizer.run(
        "对话历史:\n" + serialize(old),
    )
    # 构造一个虚拟 system 消息 + 保留近期
    return [make_system_msg(f"之前的对话摘要: {s.output}")] + recent

十一、流式 + 历史组合:真正的聊天体验

这是最终形态——用 run_stream 吐流,结束后 all_messages() 存库,下一轮再拿出来:

async def chat_turn(session_id: str, user_msg: str):
    history_raw = await redis.get(f"conv:{session_id}")
    history = ModelMessagesTypeAdapter.validate_json(history_raw) if history_raw else []

    async with agent.run_stream(user_msg, message_history=history) as response:
        async for chunk in response.stream_text(delta=True):
            yield chunk            # 边传给前端边推

        # 流结束后持久化新一轮 messages
        new_history = response.all_messages()
        await redis.set(
            f"conv:{session_id}",
            ModelMessagesTypeAdapter.dump_json(new_history),
            ex=3600,
        )

十二、八个常见坑

  1. run_stream 忘 async with:底层连接泄漏,高并发几十次就把 socket 耗尽。
  2. 流迭代中途抛异常:确保有 try/finally(或信任 async with 的上下文管理来关闭)。
  3. delta=True 但前端期望累积:两边语义对不上,显示错位。明确约定一方。
  4. 结构化流没做 debounce:partial 可能一秒几百次,前端卡死。debounce_by=0.05 起步。
  5. message_history 传了但 system prompt 也重算了:多段 system 互相干扰。同一 Agent 跨轮传 history 一般不会出问题(框架识别),但跨 Agent 时用 new_messages() 更干净。
  6. 历史不截断:token 随轮数线性增长,成本飙升。一定要有 trim 或摘要策略。
  7. 把 history 塞进 system_prompt 字符串:自制拼接,丢失 tool call/tool return 的结构。用 message_history 参数,框架会正确还原。
  8. 流式场景下校验失败重试:一次流被中断、重新开始——不是你记的字符数对不上。输出结构必须严的场景,建议非流式 + 前端伪装打字机。

十三、本章小结

记住:
run_streamasync with,stream_text(delta=True) 吐增量,stream_structured 吐部分结构化结果。
② 多轮对话 = 把上一次的 all_messages() 作为下一次 message_history
③ 历史用 ModelMessagesTypeAdapter 序列化成 JSON 存 Redis/DB,下一次反序列化即可。
④ 长对话必须 trim 或摘要,token 线性增长是灾难。