dspy.settings:全局配置中枢
import dspy dspy.settings.configure( lm=dspy.LM("openai/gpt-4o-mini", cache=True, max_tokens=1024), rm=dspy.ColBERTv2(url="http://retriever.internal:8893"), track_usage=True, # token 统计 max_backtrack=3, # Assertion 重试次数 trace=[], # trace buffer,可换自定义实现 )
任意 Module 调用都会读这份全局配置。需要局部覆盖(比如评估时禁 cache)用 dspy.context:
with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False, temperature=0)): score = evaluate(module, devset=devset)
缓存:三层加速
| 层级 | 作用 | 配置 |
|---|---|---|
| LM 内存缓存 | 同进程相同 prompt 直接返回 | LM(..., cache=True) |
| LM 磁盘缓存 | 跨进程/重启复用 | export DSPY_CACHEDIR=/mnt/cache/dspy |
| 业务级缓存 | 按 user+question 的语义命中 | 自己写 Redis,Module 外层包装 |
import redis, hashlib, json r = redis.Redis(host="redis.internal") def cached_ask(question: str) -> str: key = "qa:" + hashlib.sha256(question.encode()).hexdigest()[:16] if (hit := r.get(key)): return json.loads(hit)["answer"] pred = rag(question=question) r.setex(key, 86400, json.dumps({"answer": pred.answer})) return pred.answer
缓存失效的两个时机
① 重新 compile 后 prompt 变了,内存/磁盘缓存 key 变,自动失效
② 业务缓存要手动配 TTL。否则改了 Module 但旧答案继续返回,排查起来非常痛苦
① 重新 compile 后 prompt 变了,内存/磁盘缓存 key 变,自动失效
② 业务缓存要手动配 TTL。否则改了 Module 但旧答案继续返回,排查起来非常痛苦
观测:callback 埋点
from dspy.utils.callback import BaseCallback import time, logging, json log = logging.getLogger("dspy.obs") class ObsCallback(BaseCallback): def on_module_start(self, call_id, instance, inputs): self._t = time.time() def on_module_end(self, call_id, outputs, exception): dur = (time.time() - self._t) * 1000 usage = dspy.settings.usage_tracker.get_total_tokens() if dspy.settings.track_usage else {} log.info(json.dumps({ "call_id": call_id, "duration_ms": round(dur), "usage": usage, "err": str(exception) if exception else None, })) dspy.settings.configure(callbacks=[ObsCallback()])
接入 OpenTelemetry 只要在 callback 里起 span:
from opentelemetry import trace tracer = trace.get_tracer("dspy") class OtelCallback(BaseCallback): def on_module_start(self, call_id, instance, inputs): self.ctx = tracer.start_as_current_span(type(instance).__name__) self.ctx.__enter__() def on_module_end(self, call_id, outputs, exception): if exception: trace.get_current_span().record_exception(exception) self.ctx.__exit__(None, None, None)
成本监控
from prometheus_client import Counter TOKENS = Counter("dspy_tokens_total", "tokens used", ["model", "kind"]) REQS = Counter("dspy_requests_total", "LLM calls", ["model", "status"]) class CostCallback(BaseCallback): def on_lm_end(self, call_id, outputs, exception): usage = outputs.get("usage", {}) model = outputs.get("model", "unknown") TOKENS.labels(model, "prompt").inc(usage.get("prompt_tokens", 0)) TOKENS.labels(model, "completion").inc(usage.get("completion_tokens", 0)) REQS.labels(model, "error" if exception else "ok").inc()
再在 Grafana 上画三张图:
- 按模型分组的 token 消耗速率 → 预算预警
- 调用成功率 → SLO 看板
- p50 / p95 / p99 latency → 性能退步早发现
多模型 fallback
class FallbackLM: def __init__(self, primary, secondary): self.primary = primary self.secondary = secondary def __call__(self, prompt, **kw): try: return self.primary(prompt, **kw) except Exception as e: log.warning("primary failed: %s, falling back", e) return self.secondary(prompt, **kw) lm = FallbackLM( primary=dspy.LM("openai/gpt-4o-mini", num_retries=2), secondary=dspy.LM("anthropic/claude-haiku-4-5"), ) dspy.configure(lm=lm)
Fallback 的警告
不同模型对同一个 prompt 的输出格式可能差很多。Signature 里用
不同模型对同一个 prompt 的输出格式可能差很多。Signature 里用
desc="输出必须是 JSON" 这种强约束,两边都能过格式校验再上。
限流与熔断
import asyncio from aiolimiter import AsyncLimiter limiter = AsyncLimiter(max_rate=30, time_period=1) # 30 QPS @app.post("/ask") async def ask(q: Query): async with limiter: pred = await rag.acall(question=q.question) return {"answer": pred.answer}
熔断用 pybreaker 包装 LM 调用,连续 N 次失败后进入 open 状态,直接短路返回降级答复,避免把下游打挂。
FastAPI 生产硬化
from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware app = FastAPI(title="dspy-rag", version="v3.1") app.add_middleware(CORSMiddleware, allow_origins=["https://app.example.com"]) @app.on_event("startup") async def warmup(): dspy.configure(lm=lm, rm=rm, callbacks=[ObsCallback(), CostCallback()]) app.state.rag = RAG() app.state.rag.load("artifacts/rag_current.json") # dry-run 一次触发 LM client 初始化和 prompt cache 预热 await app.state.rag.acall(question="health check") @app.get("/ready") def ready(): if not hasattr(app.state, "rag"): raise HTTPException(503, "not ready") return {"status": "ok", "version": app.version}
/ready 供 k8s readinessProbe 使用,/healthz 只看进程是否存活。warmup 之前 pod 不接流量,避免首个请求 cold start。
蓝绿与灰度
- 新编译产物
rag_v4.json先推到 staging 跑 regression - 通过后进入 canary:10% 流量,Prometheus 对比 v3/v4 的 p95 + 分数
- 24h 无退步 → 切到 50% → 100%
- 有退步 → 软链指回 v3,pod 无需重启(startup 里 reload 即可)
@app.post("/admin/reload") def reload(token: str): if token != os.environ["ADMIN_TOKEN"]: raise HTTPException(401) new = RAG() new.load("artifacts/rag_current.json") app.state.rag = new # 原子替换引用 return {"reloaded": True}
灾难恢复清单
| 故障 | 检测 | 恢复手段 |
|---|---|---|
| 上游 LLM 宕机 | fallback 率飙升 | 自动切 secondary,告警到值班 |
| 编译产物质量退步 | canary 分数低于 baseline 5% | 软链回滚到上一 SHA,重跑 compile |
| rate-limit 撞墙 | 429 比例 > 1% | 调小 num_threads + 加限流器 |
| 磁盘缓存膨胀 | DSPY_CACHEDIR > 10GB | 定期清,只保留 7 天 |
| prompt 泄漏成本 | 某 user token 暴涨 | 业务层加请求截断,Signature 里限 max_tokens |
回归 CI
# .github/workflows/dspy-regression.yml 触发 def test_no_regression(): rag = RAG(); rag.load("artifacts/rag_current.json") score = Evaluate(devset=canary_set, metric=metric, num_threads=4)(rag) baseline = float(open("artifacts/baseline.txt").read()) assert score >= baseline - 0.02, f"退步 {baseline-score:.3f}"
canary_set 独立于 train/val/test,专门用来在线对照,每周加 20 条新线上采样问题。
日志脱敏
import re SENSITIVE = re.compile(r"(\d{11}|\d{4} \d{4} \d{4} \d{4})") class RedactCallback(BaseCallback): def on_lm_end(self, call_id, outputs, exception): text = outputs.get("response", "") outputs["response"] = SENSITIVE.sub("[REDACTED]", text)
符合合规要求:手机号、银行卡、邮箱等打日志前先替换。
本章小结
- 三层缓存:LM 内存、磁盘、业务 Redis——越靠外命中率越高但也越难失效
- Callback 是观测入口:埋时延、token、错误 → Prometheus / OTel / 日志
- 多模型 fallback + 限流 + 熔断是生产三件套,缺一个就容易全链路雪崩
- 蓝绿靠软链 +
/admin/reload,回滚不用重启 pod - canary + 回归 CI 防止新 compile 偷偷退步