Chapter 10

实战:把客服 Agent 接全链路可观测

前 9 章拆开的零件,这一章重新装回去——用 ai-agent 教程 里的客服 Agent 做底座,从 trace 到告警端到端跑通。

目标场景

一个典型客服 bot,架构长这样:

User ──► FastAPI /chat │ ▼ ┌─────────────────┐ │ classify intent │ (LLM, gpt-4o-mini) └──────┬──────────┘ │ ┌─────────┴──────────┐ ▼ ▼ order lookup retrieval (vector DB) │ │ └─────────┬──────────┘ ▼ answer LLM (gpt-4o) │ ▼ 回给 user

想要能在 Langfuse 里:

项目结构

customer-bot/
├── app/
│   ├── main.py          # FastAPI 入口
│   ├── agent.py         # 对话核心
│   ├── tools.py         # order_lookup / retrieval
│   └── config.py
├── prompts/
│   └── bootstrap.py     # 一次性把 prompt create 到 Langfuse
├── scripts/
│   ├── run_regression.py
│   └── judge_online.py  # 线上采样打分(用 cron / k8s CronJob)
├── benchmarks/
│   └── latest.json      # 基线
├── .github/workflows/regression.yml
├── Dockerfile
└── requirements.txt

Step 1:把 prompt 上传到 Langfuse

prompts/bootstrap.py,CI 里跑一次即可:

from langfuse import Langfuse
lf = Langfuse()

lf.create_prompt(
    name="cs-classify",
    type="chat",
    prompt=[
        {"role": "system",
         "content": "请把用户问题分类, 只返回 JSON: {\"intent\": \"order|logistics|refund|other\"}"},
        {"role": "user", "content": "{{query}}"},
    ],
    labels=["production", "latest"],
    config={"model": "gpt-4o-mini", "temperature": 0.0},
)

lf.create_prompt(
    name="cs-answer",
    type="chat",
    prompt=[
        {"role": "system",
         "content": "你是客服小古。用户 {{user_name}} (等级 {{tier}})。"
                    "只用给定上下文回答, 不确定说不知道。\n上下文:\n{{context}}"},
        {"role": "user", "content": "{{query}}"},
    ],
    labels=["production", "latest"],
    config={"model": "gpt-4o", "temperature": 0.2},
)

Step 2:给 Agent 装 @observe

app/agent.py,用 Langfuse 装饰器:

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from langfuse.openai import openai

lf = Langfuse()

@observe(name="retrieval")
def retrieve(query: str) -> list[str]:
    return vector_search(query, top_k=4)

@observe(name="order-lookup")
def order_lookup(order_id: str) -> dict:
    return db.get_order(order_id)

@observe(name="classify")
def classify(query: str) -> str:
    p = lf.get_prompt("cs-classify", label="production")
    langfuse_context.update_current_observation(prompt=p)
    msgs = p.compile(query=query)
    rsp = openai.chat.completions.create(
        model=p.config["model"], temperature=p.config["temperature"],
        messages=msgs, response_format={"type": "json_object"},
        langfuse_prompt=p,
    )
    return json.loads(rsp.choices[0].message.content)["intent"]

@observe(name="answer")
def answer(query: str, context: str, user_name: str, tier: str) -> str:
    p = lf.get_prompt("cs-answer", label="production")
    msgs = p.compile(query=query, context=context, user_name=user_name, tier=tier)
    rsp = openai.chat.completions.create(
        model=p.config["model"], temperature=p.config["temperature"],
        messages=msgs, langfuse_prompt=p,
    )
    return rsp.choices[0].message.content

@observe(name="chat-turn")
def chat(query: str, user_id: str, session_id: str, user_name: str, tier: str):
    # 把 trace 级元数据打全
    langfuse_context.update_current_trace(
        user_id=user_id,
        session_id=session_id,
        tags=[f"tier:{tier}", "env:prod", "ver:2026-05"],
        metadata={"channel": "web"},
    )

    intent = classify(query)
    if intent == "order":
        order_id = extract_order_id(query)
        ctx = str(order_lookup(order_id)) if order_id else ""
    else:
        ctx = "\n".join(retrieve(query))

    return answer(query, ctx, user_name, tier)

Step 3:FastAPI 入口

# app/main.py
from fastapi import FastAPI, Header
from pydantic import BaseModel
from .agent import chat, lf

app = FastAPI()

class ChatIn(BaseModel):
    query: str
    session_id: str

@app.post("/chat")
def chat_endpoint(body: ChatIn, x_user: str = Header()):
    user = load_user(x_user)
    answer = chat(body.query, x_user, body.session_id, user.name, user.tier)
    # 回 trace_id 给前端, 下面的反馈 API 要用
    trace_id = langfuse_context.get_current_trace_id()
    return {"answer": answer, "trace_id": trace_id}

@app.post("/feedback")
def feedback(trace_id: str, thumbs: str):
    lf.score(
        trace_id=trace_id, name="user_feedback",
        value=1.0 if thumbs == "up" else 0.0,
        data_type="BOOLEAN", source="ANNOTATION",
    )
    return {"ok": True}

@app.on_event("shutdown")
def _flush():
    lf.flush()

Step 4:回归集 + CI

先造 dataset,scripts/seed_dataset.py(只需跑一次):

from langfuse import Langfuse
lf = Langfuse()
lf.create_dataset(name="cs-regression-v1")

for q, a in load_seed_cases():
    lf.create_dataset_item(
        dataset_name="cs-regression-v1",
        input={"query": q},
        expected_output={"answer": a},
    )

CI 跑回归,scripts/run_regression.py:

import os, sys, json
from langfuse import Langfuse
from app.agent import chat

lf = Langfuse()
ds = lf.get_dataset("cs-regression-v1")
run_name = f"ci-{os.environ.get('GITHUB_SHA','local')[:7]}"

scores = []
for item in ds.items:
    with item.run(run_name=run_name) as h:
        out = chat(item.input["query"], "ci-user", "ci-session", "QA", "test")
        h.update_trace(output=out)
        # 简单评估, 真正生产上接 LLM-as-Judge evaluator
        keywords = item.expected_output["answer"].split()
        hit = sum(1 for k in keywords if k in out) / max(len(keywords), 1)
        h.score(name="keyword_hit", value=hit)
        scores.append(hit)

lf.flush()

avg = sum(scores) / len(scores)
baseline = json.load(open("benchmarks/latest.json"))
print(f"avg={avg:.3f}  baseline={baseline['avg']:.3f}")

if avg < baseline["avg"] - 0.03:
    print("REGRESSION blocked"); sys.exit(1)

if os.environ.get("GITHUB_REF") == "refs/heads/main":
    json.dump({"avg": avg, "run": run_name}, open("benchmarks/latest.json", "w"))

Step 5:线上 LLM-as-Judge 采样

K8s CronJob,每 10 分钟采样一次 prod trace 发评分。scripts/judge_online.py:

import json
from langfuse import Langfuse
from datetime import datetime, timedelta, timezone
import openai

lf = Langfuse()
end = datetime.now(timezone.utc)
start = end - timedelta(minutes=10)

traces = lf.get_traces(
    from_timestamp=start, to_timestamp=end,
    tags=["env:prod"], limit=500,
)

JUDGE = """你是严格评估员, 判断 answer 是否正确回应 query。
返回 JSON: {\"score\": 0.0-1.0, \"reason\": \"...\"}
query: {q}
answer: {a}"""

for t in traces.data[::5]:  # 20% 采样
    q = (t.input or {}).get("query")
    a = t.output or ""
    if not q or not a: continue
    rsp = openai.chat.completions.create(
        model="gpt-4o-mini",
        response_format={"type": "json_object"},
        messages=[{"role": "user", "content": JUDGE.format(q=q, a=a)}],
    )
    r = json.loads(rsp.choices[0].message.content)
    lf.score(trace_id=t.id, name="relevancy_judge",
             value=float(r["score"]), comment=r["reason"])

lf.flush()
# k8s/cronjob-judge.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: langfuse-online-judge
spec:
  schedule: "*/10 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          containers:
            - name: judge
              image: your-registry/customer-bot:latest
              command: ["python", "scripts/judge_online.py"]
              envFrom:
                - secretRef: { name: langfuse-keys }
                - secretRef: { name: openai-key }

Step 6:告警

UI 里 Alerts → New Alert:

补一条极端 case:

Step 7:发布流程

  1. 产品在 UI 改 cs-answer prompt,保存,新版本挂 latest
  2. 工程给新版挂 staging,staging 环境 SDK 拉 label=staging
  3. staging 跑一轮回归脚本,均分不降
  4. 线上放量 1 天,盯 UI 的 user_feedbackrelevancy_judge
  5. 均无回退后,UI 把 production label 从旧版抢到新版
  6. SDK 缓存 60s 后全量切换;发现异常就把 label 挪回旧版,秒级回滚

一天里你会在 UI 看到什么

早上 9 点
Dashboard:昨天 10 万 trace,total cost $8.4,平均 relevancy 0.82,user_feedback 赞率 78%。一切正常。
上午 11 点
Slack 告警:relevancy_judge 过去 1 小时均分掉到 0.71。进 Traces 按 tag + 时间过滤,发现集中在 tier:free + retrieval 命中不足的 case。复制 3 条 bad trace 加入 dataset 待审。
下午 2 点
产品改了 cs-answer v9,多说一句"若无法定位订单请附截图"。staging 灰度,回归均分从 0.82 升到 0.85,QA 审核通过,promote 到 production。
下午 5 点
线上 v9 跑了 3 小时,user_feedback 赞率上升 4%。Sessions 面板看最长的一通对话 18 轮,全部关联到同一个 session_id,上下文传得对。

从这里往哪里走

收官一句话
"可观测"不是某个工具装上就完事的结果——它是一套习惯:遇事先看 trace,上线前跑回归,每周看 Dashboard 趋势。Langfuse 是这套习惯的载体,真正让习惯运转起来的是你。

本章小结