Chapter 09

生产部署:缓存、观测、灾难恢复

开发期的 DSPy 像 notebook,生产期的 DSPy 像飞行器——要限流、要降级、要观测、要回滚。本章把"跑起来"变成"一直跑得稳"。

dspy.settings:全局配置中枢

import dspy

dspy.settings.configure(
    lm=dspy.LM("openai/gpt-4o-mini", cache=True, max_tokens=1024),
    rm=dspy.ColBERTv2(url="http://retriever.internal:8893"),
    track_usage=True,          # token 统计
    max_backtrack=3,            # Assertion 重试次数
    trace=[],                    # trace buffer,可换自定义实现
)

任意 Module 调用都会读这份全局配置。需要局部覆盖(比如评估时禁 cache)用 dspy.context:

with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False, temperature=0)):
    score = evaluate(module, devset=devset)

缓存:三层加速

层级作用配置
LM 内存缓存同进程相同 prompt 直接返回LM(..., cache=True)
LM 磁盘缓存跨进程/重启复用export DSPY_CACHEDIR=/mnt/cache/dspy
业务级缓存按 user+question 的语义命中自己写 Redis,Module 外层包装
import redis, hashlib, json
r = redis.Redis(host="redis.internal")

def cached_ask(question: str) -> str:
    key = "qa:" + hashlib.sha256(question.encode()).hexdigest()[:16]
    if (hit := r.get(key)):
        return json.loads(hit)["answer"]
    pred = rag(question=question)
    r.setex(key, 86400, json.dumps({"answer": pred.answer}))
    return pred.answer
缓存失效的两个时机
① 重新 compile 后 prompt 变了,内存/磁盘缓存 key 变,自动失效
② 业务缓存要手动配 TTL。否则改了 Module 但旧答案继续返回,排查起来非常痛苦

观测:callback 埋点

from dspy.utils.callback import BaseCallback
import time, logging, json

log = logging.getLogger("dspy.obs")

class ObsCallback(BaseCallback):
    def on_module_start(self, call_id, instance, inputs):
        self._t = time.time()

    def on_module_end(self, call_id, outputs, exception):
        dur = (time.time() - self._t) * 1000
        usage = dspy.settings.usage_tracker.get_total_tokens() if dspy.settings.track_usage else {}
        log.info(json.dumps({
            "call_id": call_id,
            "duration_ms": round(dur),
            "usage": usage,
            "err": str(exception) if exception else None,
        }))

dspy.settings.configure(callbacks=[ObsCallback()])

接入 OpenTelemetry 只要在 callback 里起 span:

from opentelemetry import trace
tracer = trace.get_tracer("dspy")

class OtelCallback(BaseCallback):
    def on_module_start(self, call_id, instance, inputs):
        self.ctx = tracer.start_as_current_span(type(instance).__name__)
        self.ctx.__enter__()

    def on_module_end(self, call_id, outputs, exception):
        if exception:
            trace.get_current_span().record_exception(exception)
        self.ctx.__exit__(None, None, None)

成本监控

from prometheus_client import Counter

TOKENS = Counter("dspy_tokens_total", "tokens used", ["model", "kind"])
REQS   = Counter("dspy_requests_total", "LLM calls", ["model", "status"])

class CostCallback(BaseCallback):
    def on_lm_end(self, call_id, outputs, exception):
        usage = outputs.get("usage", {})
        model = outputs.get("model", "unknown")
        TOKENS.labels(model, "prompt").inc(usage.get("prompt_tokens", 0))
        TOKENS.labels(model, "completion").inc(usage.get("completion_tokens", 0))
        REQS.labels(model, "error" if exception else "ok").inc()

再在 Grafana 上画三张图:

多模型 fallback

class FallbackLM:
    def __init__(self, primary, secondary):
        self.primary = primary
        self.secondary = secondary

    def __call__(self, prompt, **kw):
        try:
            return self.primary(prompt, **kw)
        except Exception as e:
            log.warning("primary failed: %s, falling back", e)
            return self.secondary(prompt, **kw)

lm = FallbackLM(
    primary=dspy.LM("openai/gpt-4o-mini", num_retries=2),
    secondary=dspy.LM("anthropic/claude-haiku-4-5"),
)
dspy.configure(lm=lm)
Fallback 的警告
不同模型对同一个 prompt 的输出格式可能差很多。Signature 里用 desc="输出必须是 JSON" 这种强约束,两边都能过格式校验再上。

限流与熔断

import asyncio
from aiolimiter import AsyncLimiter

limiter = AsyncLimiter(max_rate=30, time_period=1)   # 30 QPS

@app.post("/ask")
async def ask(q: Query):
    async with limiter:
        pred = await rag.acall(question=q.question)
    return {"answer": pred.answer}

熔断用 pybreaker 包装 LM 调用,连续 N 次失败后进入 open 状态,直接短路返回降级答复,避免把下游打挂。

FastAPI 生产硬化

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(title="dspy-rag", version="v3.1")

app.add_middleware(CORSMiddleware, allow_origins=["https://app.example.com"])

@app.on_event("startup")
async def warmup():
    dspy.configure(lm=lm, rm=rm, callbacks=[ObsCallback(), CostCallback()])
    app.state.rag = RAG()
    app.state.rag.load("artifacts/rag_current.json")
    # dry-run 一次触发 LM client 初始化和 prompt cache 预热
    await app.state.rag.acall(question="health check")

@app.get("/ready")
def ready():
    if not hasattr(app.state, "rag"):
        raise HTTPException(503, "not ready")
    return {"status": "ok", "version": app.version}

/ready 供 k8s readinessProbe 使用,/healthz 只看进程是否存活。warmup 之前 pod 不接流量,避免首个请求 cold start。

蓝绿与灰度

  1. 新编译产物 rag_v4.json 先推到 staging 跑 regression
  2. 通过后进入 canary:10% 流量,Prometheus 对比 v3/v4 的 p95 + 分数
  3. 24h 无退步 → 切到 50% → 100%
  4. 有退步 → 软链指回 v3,pod 无需重启(startup 里 reload 即可)
@app.post("/admin/reload")
def reload(token: str):
    if token != os.environ["ADMIN_TOKEN"]: raise HTTPException(401)
    new = RAG()
    new.load("artifacts/rag_current.json")
    app.state.rag = new    # 原子替换引用
    return {"reloaded": True}

灾难恢复清单

故障检测恢复手段
上游 LLM 宕机fallback 率飙升自动切 secondary,告警到值班
编译产物质量退步canary 分数低于 baseline 5%软链回滚到上一 SHA,重跑 compile
rate-limit 撞墙429 比例 > 1%调小 num_threads + 加限流器
磁盘缓存膨胀DSPY_CACHEDIR > 10GB定期清,只保留 7 天
prompt 泄漏成本某 user token 暴涨业务层加请求截断,Signature 里限 max_tokens

回归 CI

# .github/workflows/dspy-regression.yml 触发
def test_no_regression():
    rag = RAG(); rag.load("artifacts/rag_current.json")
    score = Evaluate(devset=canary_set, metric=metric, num_threads=4)(rag)
    baseline = float(open("artifacts/baseline.txt").read())
    assert score >= baseline - 0.02, f"退步 {baseline-score:.3f}"

canary_set 独立于 train/val/test,专门用来在线对照,每周加 20 条新线上采样问题。

日志脱敏

import re

SENSITIVE = re.compile(r"(\d{11}|\d{4} \d{4} \d{4} \d{4})")

class RedactCallback(BaseCallback):
    def on_lm_end(self, call_id, outputs, exception):
        text = outputs.get("response", "")
        outputs["response"] = SENSITIVE.sub("[REDACTED]", text)

符合合规要求:手机号、银行卡、邮箱等打日志前先替换。

本章小结