生产级 RAG 的挑战清单
一个能运行的 RAG Demo 距离生产级系统还有巨大的工程鸿沟。以下是将 RAG 推向生产时必须解决的核心问题:
多租户与权限
- 不同用户/部门只能访问各自的文档
- 行级权限控制(某用户只能看某类文件)
- 租户数据隔离(A 的检索不影响 B)
可靠性与性能
- LLM API 超时、限速的重试策略
- 高并发下的检索稳定性
- 大文档批量索引不阻塞查询服务
可观测性
- 每次查询的延迟分布和成本追踪
- 检索质量的在线监控
- 异常告警和根因分析
成本控制
- Embedding API 费用随文档量线性增长
- LLM 生成是最大成本中心
- 向量存储的内存成本
多租户架构设计
多租户 RAG 有三种常见的隔离策略,各有权衡:
| 策略 | 实现方式 | 隔离程度 | 成本 | 适合场景 |
|---|---|---|---|---|
| Collection 隔离 | 每个租户一个 Qdrant Collection | 完全隔离 | 高(内存线性增长) | 大企业客户,数量少(<100) |
| Payload 过滤 | 共享 Collection,用 tenant_id 过滤 | 逻辑隔离 | 低 | SaaS 场景,租户数量多 |
| 命名空间 | Pinecone Namespace / Qdrant 分组 | 中等 | 中 | 中等规模,简化管理 |
基于 Payload 过滤的多租户实现
from qdrant_client import QdrantClient, models
from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Optional
import jwt
app = FastAPI()
client = QdrantClient(host="localhost", port=6333)
def get_tenant_id(authorization: str = Header(...)) -> str:
"""从 JWT Token 提取租户 ID"""
try:
token = authorization.replace("Bearer ", "")
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload["tenant_id"]
except Exception:
raise HTTPException(status_code=401, detail="Invalid token")
def tenant_search(
query_vector: list[float],
tenant_id: str,
top_k: int = 5,
extra_filter: Optional[models.Filter] = None,
):
"""带租户隔离的向量检索"""
# 强制添加 tenant_id 过滤条件
tenant_filter = models.Filter(
must=[
models.FieldCondition(
key="tenant_id",
match=models.MatchValue(value=tenant_id)
)
]
)
# 如有额外过滤条件,合并
if extra_filter:
combined_filter = models.Filter(
must=[tenant_filter, extra_filter]
)
else:
combined_filter = tenant_filter
return client.search(
collection_name="shared_collection",
query_vector=query_vector,
query_filter=combined_filter,
limit=top_k,
with_payload=True,
)
@app.post("/query")
async def query(
question: str,
tenant_id: str = Depends(get_tenant_id)
):
# 只检索该租户的文档
q_vec = embed(question)
results = tenant_search(q_vec, tenant_id)
return generate_answer(question, results)
安全过滤与内容审核
from openai import OpenAI
openai_client = OpenAI()
class RAGSecurityLayer:
"""RAG 安全层:输入过滤 + 输出审核"""
BLOCKED_PATTERNS = [
"忽略之前的指令", "ignore previous instructions",
"扮演", "pretend you are",
"jailbreak", "越狱",
]
def check_prompt_injection(self, query: str) -> bool:
"""检测 Prompt 注入攻击"""
query_lower = query.lower()
return any(
pattern.lower() in query_lower
for pattern in self.BLOCKED_PATTERNS
)
def check_content_policy(self, text: str) -> bool:
"""使用 OpenAI Moderation API 审核内容"""
response = openai_client.moderations.create(input=text)
result = response.results[0]
return not result.flagged
def safe_query(self, query: str) -> str:
"""安全查询:输入检查 → RAG → 输出审核"""
# 1. Prompt 注入检测
if self.check_prompt_injection(query):
raise ValueError("检测到潜在的 Prompt 注入攻击")
# 2. 查询内容审核
if not self.check_content_policy(query):
raise ValueError("查询内容不符合使用政策")
# 3. 运行 RAG
answer = rag_chain.invoke(query)
# 4. 输出审核
if not self.check_content_policy(answer):
return "抱歉,我无法提供该问题的答案。"
return answer
可观测性:LangSmith 集成
import os
from langsmith import Client
# 设置 LangSmith 追踪(在 .env 文件中配置)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "rag-production"
# 所有 LangChain 调用会自动追踪,无需修改代码
# LangSmith Dashboard 显示:延迟、Token 使用、检索结果、LLM 输入输出
# 自定义追踪指标
from langsmith import traceable
@traceable(name="rag_query", tags=["production"])
def traced_rag_query(question: str, tenant_id: str) -> dict:
import time
start = time.time()
results = rag_chain.invoke(question)
return {
"answer": results,
"latency_ms": (time.time() - start) * 1000,
"tenant_id": tenant_id,
}
成本控制策略
from functools import lru_cache
import hashlib
import redis
import json
redis_client = redis.Redis(host="localhost", port=6379)
class CostOptimizedRAG:
"""成本优化的 RAG 实现"""
def __init__(self):
self.embed_cache = {} # 内存 Embedding 缓存
self.answer_ttl = 3600 # 答案缓存 1 小时
def cached_embed(self, text: str) -> list[float]:
"""Embedding 缓存,相同文本不重复请求 API"""
key = hashlib.md5(text.encode()).hexdigest()
if key not in self.embed_cache:
self.embed_cache[key] = embed(text)
return self.embed_cache[key]
def cached_answer(self, question: str, tenant_id: str) -> Optional[str]:
"""检查 Redis 缓存是否有该问题的答案"""
cache_key = f"rag:{tenant_id}:{hashlib.md5(question.encode()).hexdigest()}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
def save_answer(self, question: str, tenant_id: str, answer: str):
cache_key = f"rag:{tenant_id}:{hashlib.md5(question.encode()).hexdigest()}"
redis_client.setex(cache_key, self.answer_ttl, json.dumps(answer))
def smart_model_selection(self, question: str) -> str:
"""根据问题复杂度选择不同档次的 LLM"""
# 简单问题用便宜模型
simple_keywords = ["是什么", "定义", "who is", "what is"]
if any(kw in question for kw in simple_keywords):
return "gpt-4o-mini" # 便宜 10x
# 复杂推理用强模型
return "gpt-4o"
# 成本估算工具
class CostTracker:
PRICES = {
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006}, # $/1K tokens
"gpt-4o": {"input": 0.0025, "output": 0.01},
"text-embedding-3-small": {"input": 0.00002, "output": 0},
}
def estimate_monthly_cost(self, queries_per_day: int, docs: int) -> dict:
# 假设:每次查询平均 1000 input + 300 output tokens
llm_cost = queries_per_day * 30 * (
1000 * self.PRICES["gpt-4o-mini"]["input"] / 1000 +
300 * self.PRICES["gpt-4o-mini"]["output"] / 1000
)
embed_cost = docs * 500 * self.PRICES["text-embedding-3-small"]["input"] / 1000
return {"monthly_llm_usd": llm_cost, "one_time_embed_usd": embed_cost}
混合存储:向量数据库 + 图数据库
复杂的企业知识图谱场景下,纯向量检索有局限:无法处理"找所有直接下属文件"这样的结构化关系查询。向量 + 图的混合架构可以解决多跳推理问题。
# 向量数据库:语义检索(Qdrant)
# 图数据库:关系遍历(Neo4j)
from neo4j import GraphDatabase
neo4j_driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password")
)
def hybrid_search(question: str) -> str:
"""
混合检索:
1. 向量检索找到相关文档节点
2. 图遍历扩展找相关联的文档
3. 合并后送入 LLM
"""
# 第一步:向量检索
q_vec = embed(question)
vector_hits = client.search("rag_knowledge_base", q_vec, limit=5)
seed_ids = [h.payload["doc_id"] for h in vector_hits]
# 第二步:图遍历扩展
with neo4j_driver.session() as session:
result = session.run("""
MATCH (d:Document)-[:REFERENCES|RELATED_TO*1..2]->(related)
WHERE d.doc_id IN $seed_ids
RETURN DISTINCT related.doc_id AS doc_id, related.content AS content
LIMIT 10
""", seed_ids=seed_ids)
graph_docs = [row["content"] for row in result]
# 第三步:合并所有上下文
all_contexts = (
[h.payload["text"] for h in vector_hits] + graph_docs
)
return generate_answer(question, all_contexts)
生产部署架构图
用户请求
│
▼
[API Gateway / 负载均衡]
│
├──→ [认证服务] → 提取 tenant_id
│
▼
[RAG API Server (FastAPI / 多实例)]
│
├──→ [安全过滤层] (Prompt 注入检测 + 内容审核)
│
├──→ [语义缓存 Redis] ──命中→ 直接返回
│ 未命中
│
├──→ [Embedding Service] (本地 BGE / OpenAI)
│
├──→ [Qdrant 集群] (向量检索 + 租户过滤)
│ ↕
├──→ [BM25 检索 (Elasticsearch)]
│
├──→ [Reranker Service] (BGE-Reranker)
│
├──→ [LLM Service] (OpenAI / Claude / 本地 vLLM)
│
├──→ [LangSmith / OpenTelemetry] (可观测性)
│
└──→ [RAGAS 在线评估] (持续质量监控)
存储层:
Qdrant (向量) + PostgreSQL (元数据) + Redis (缓存) + S3 (原始文档)
水平扩展策略
RAG API Server 是无状态的,可以直接水平扩展。Qdrant 支持分片(每个节点一个分片),多个 API Server 共享同一个 Qdrant 集群。LLM 调用是最大瓶颈,可以考虑本地部署 vLLM(适合 QPS 高的私有化场景)。
文档更新策略
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import hashlib
class DocumentChangeHandler(FileSystemEventHandler):
"""监控文件系统变更,自动触发索引更新"""
def __init__(self, indexer):
self.indexer = indexer
self.processed_hashes = self._load_hash_db()
def on_created(self, event):
if event.src_path.endswith('.pdf'):
self.indexer.index_file(event.src_path)
def on_modified(self, event):
if event.src_path.endswith('.pdf'):
new_hash = self._file_hash(event.src_path)
# 只有内容变化时才重新索引
if new_hash != self.processed_hashes.get(event.src_path):
self.indexer.delete_by_source(event.src_path)
self.indexer.index_file(event.src_path)
self.processed_hashes[event.src_path] = new_hash
def _file_hash(self, path: str) -> str:
with open(path, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
# 启动文件监控
observer = Observer()
observer.schedule(DocumentChangeHandler(indexer), path="./knowledge_base/", recursive=True)
observer.start()
生产上线前检查清单
1. Qdrant API Key 已设置,不暴露公网
2. 所有 LLM API Key 通过环境变量注入,不在代码中硬编码
3. 向量量化(INT8)已开启,内存占用达到预期
4. 核心 Payload 字段已建索引
5. 已完成 RAGAS 基准测试并记录初始指标
6. LangSmith 或 OpenTelemetry 追踪已开启
7. 成本告警已配置(月度 LLM 费用阈值)
8. 备份策略已验证(Qdrant snapshot 定时备份到 S3)
完整课程总结
- 第1章:RAG = 检索 + 增强 + 生成,解决 LLM 的知识截止和幻觉问题
- 第2章:高质量分块是 RAG 地基,递归字符分块 + 父子分块是核心策略
- 第3章:Embedding 把语义变成向量距离,BGE-M3 是中文场景的开源首选
- 第4章:HNSW 让亿级向量毫秒级检索,Qdrant 是生产环境的最佳选择
- 第5章:Qdrant 的 Collection、Point、Payload 过滤是生产 RAG 的基础设施
- 第6章:混合检索 + Reranker + MMR 是 Advanced RAG 的标准配置
- 第7章:LlamaIndex 的 Index/Node/QueryEngine 链路清晰,适合复杂 RAG
- 第8章:LCEL 构建对话 RAG,Redis 持久化记忆是生产标配
- 第9章:RAGAS 四指标量化 RAG 质量,没有指标就没有改进
- 第10章:多租户、安全过滤、可观测性、成本控制——生产级 RAG 缺一不可