Chapter 07

流式输出:让用户看到"思考过程"

AI 产品的体验天花板,90% 取决于流式做得好不好。等 30 秒看完整答案的年代过去了——现在要逐字流、还要把"正在调工具""正在检索"都展示给用户看。

为什么流式这么重要

LLM 生成一个完整回答平均 3-10 秒。如果不流式,用户看到的是:

[用户发消息] ─── 8 秒白屏 ─── [完整答案刷出来]

流式后:

[用户发消息] ─ 300ms ─ [第一个字] ─── [逐字流出] ─── [完整] ↑ ↑ TTFT token/s

TTFT(Time To First Token)从 8 秒降到 300ms,感知速度提升一个数量级。LangGraph 比单纯 LLM 流式多一层——它还能流"节点进度"。

三种 stream_mode

# 通用签名
for chunk in app.stream(
    {"messages": [...]},
    config=config,
    stream_mode="values",    # or "updates" / "messages" / "debug"
):
    print(chunk)

① values:每步完成后,吐出完整 state

for chunk in app.stream(inp, config, stream_mode="values"):
    print(chunk["messages"][-1])
#  每跑一个节点就吐一次,chunk 是完整 state 快照
#  适合:前端只想看"最新消息",state 体积小

② updates:只吐增量

for chunk in app.stream(inp, config, stream_mode="updates"):
    print(chunk)
#  {"think": {"messages": [AIMessage(...)]}}
#  {"act":   {"messages": [ToolMessage(...)]}}
#  chunk 是 {node_name: update_dict},省流量、知道是谁更新

③ messages:LLM token 级流式

for token, meta in app.stream(inp, config, stream_mode="messages"):
    print(token.content, end="", flush=True)
    # meta 包含 {"langgraph_node": "think", "langgraph_step": 1, ...}
组合模式
传 list 可同时拿多种:stream_mode=["updates", "messages"]。 chunk 是 (mode, payload) 元组,前端可按类型分发。

④ debug:全量事件流(调试用)

for evt in app.stream(inp, config, stream_mode="debug"):
    print(evt["type"], evt["payload"])
# type 有 task / task_result / checkpoint 等,生产别开,太吵

astream_events:最细粒度

想同时拿 LLM token + 工具调用 + 检索事件?用 astream_events:

async for event in app.astream_events(inp, config, version="v2"):
    kind = event["event"]
    name = event["name"]

    if kind == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="")

    elif kind == "on_tool_start":
        print(f"\n🔧 调用工具 {name} 参数: {event['data']['input']}")

    elif kind == "on_tool_end":
        print(f"\n✓ {name} 返回: {event['data']['output']}")

    elif kind == "on_chain_start" and event["metadata"].get("langgraph_node"):
        print(f"\n▶ 进入节点: {event['metadata']['langgraph_node']}")

常用事件类型

event触发时机典型用途
on_chat_model_streamLLM 每个 token打字机效果
on_chat_model_endLLM 一次调用结束统计 tokens / latency
on_tool_start工具开始跑前端显示"正在查询..."
on_tool_end工具跑完把结果展示给用户
on_retriever_end检索完成展示引用文档
on_chain_start/end子链/节点进出打进度条

前端 SSE 对接

后端(FastAPI + SSE)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app_web = FastAPI()

@app_web.post("/chat")
async def chat(req):
    async def gen():
        cfg = {"configurable": {"thread_id": req.thread_id}}
        inp = {"messages": [("user", req.query)]}

        async for ev in app.astream_events(inp, cfg, version="v2"):
            kind = ev["event"]

            if kind == "on_chat_model_stream":
                yield f"data: {json.dumps({'type':'token','c':ev['data']['chunk'].content})}\n\n"

            elif kind == "on_tool_start":
                yield f"data: {json.dumps({'type':'tool_start','name':ev['name'],'input':ev['data']['input']})}\n\n"

            elif kind == "on_tool_end":
                yield f"data: {json.dumps({'type':'tool_end','name':ev['name'],'output':str(ev['data']['output'])})}\n\n"

        yield "data: [DONE]\n\n"

    return StreamingResponse(gen(), media_type="text/event-stream")

前端(原生 fetch + ReadableStream)

const res = await fetch("/chat", {
  method: "POST",
  body: JSON.stringify({query, thread_id}),
});

const reader = res.body.pipeThrough(new TextDecoderStream()).getReader();
let buf = "";

while (true) {
  const {done, value} = await reader.read();
  if (done) break;
  buf += value;

  const parts = buf.split("\n\n");
  buf = parts.pop();   // 末段可能不完整,留下

  for (const p of parts) {
    if (!p.startsWith("data: ")) continue;
    const body = p.slice(6);
    if (body === "[DONE]") return;

    const evt = JSON.parse(body);
    if (evt.type === "token")      appendText(evt.c);
    else if (evt.type === "tool_start") showToolBadge(evt.name);
    else if (evt.type === "tool_end")   removeToolBadge(evt.name);
  }
}

三种 UX 形态

① ChatGPT 式:只流文本

stream_mode="messages" 直接打字机效果,工具调用用 loading 句式("正在查询订单...")掩盖掉。

② Perplexity 式:过程透明

on_tool_starton_retriever_end 也显示出来:

🔍 正在搜索 "LangGraph 教程"
  ├ 召回 5 篇文档
🧠 正在生成回答
  │ LangGraph 是一个用于构建……
  ✓ 完成(1.3s)

③ Devin 式:展开可点

工具调用可折叠,用户点开看输入/输出。和 astream_events 的 start/end 配对完美。

流式 + 中断组合

如果图里有 interrupt(),流到中断点会自然停下。前端要识别这种"流意外结束":

async for ev in app.astream_events(inp, cfg, version="v2"):
    ...

# 流结束后检查 state
snap = await app.aget_state(cfg)
if snap.tasks and any(t.interrupts for t in snap.tasks):
    # 需要人工,给前端推 { type: "needs_human", payload: ... }
    yield f"data: {json.dumps({'type':'needs_human','payload':snap.tasks[0].interrupts[0].value})}\n\n"

只流某个节点

有时你只想流"对外说话"的 LLM,不想让"内部 rewrite"的 token 也泄漏给用户。用 tags:

llm_public = ChatOpenAI(...).with_config(tags=["public"])
llm_internal = ChatOpenAI(...).with_config(tags=["internal"])

# 前端只订阅 public tag
async for ev in app.astream_events(inp, cfg, version="v2",
                                   include_tags=["public"]):
    ...

性能实践:不要让流式阻塞

常见坑

症状解法
拿不到 token 流只吐完整消息stream_mode="messages"astream_events
前端一下子刷全部看似不流式Nginx proxy_buffering off + 响应 header X-Accel-Buffering: no
SSE 断连无提示用户以为卡住加 heartbeat,前端 onerror 自动重连
astream_events v1 已弃警告刷屏显式传 version="v2"
中文被切半出现 SSE 要 UTF-8,别随便 slice bytes

本章小结