Chapter 10

生产级 RAG 系统设计

从原型到生产——多租户架构、安全访问控制、混合存储、可观测性监控与成本优化的完整蓝图

生产级 RAG 的挑战清单

一个能运行的 RAG Demo 距离生产级系统还有巨大的工程鸿沟。以下是将 RAG 推向生产时必须解决的核心问题:

多租户与权限

  • 不同用户/部门只能访问各自的文档
  • 行级权限控制(某用户只能看某类文件)
  • 租户数据隔离(A 的检索不影响 B)

可靠性与性能

  • LLM API 超时、限速的重试策略
  • 高并发下的检索稳定性
  • 大文档批量索引不阻塞查询服务

可观测性

  • 每次查询的延迟分布和成本追踪
  • 检索质量的在线监控
  • 异常告警和根因分析

成本控制

  • Embedding API 费用随文档量线性增长
  • LLM 生成是最大成本中心
  • 向量存储的内存成本

多租户架构设计

多租户 RAG 有三种常见的隔离策略,各有权衡:

策略 实现方式 隔离程度 成本 适合场景
Collection 隔离 每个租户一个 Qdrant Collection 完全隔离 高(内存线性增长) 大企业客户,数量少(<100)
Payload 过滤 共享 Collection,用 tenant_id 过滤 逻辑隔离 SaaS 场景,租户数量多
命名空间 Pinecone Namespace / Qdrant 分组 中等 中等规模,简化管理

基于 Payload 过滤的多租户实现

from qdrant_client import QdrantClient, models
from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Optional
import jwt

app = FastAPI()
client = QdrantClient(host="localhost", port=6333)

def get_tenant_id(authorization: str = Header(...)) -> str:
    """从 JWT Token 提取租户 ID"""
    try:
        token = authorization.replace("Bearer ", "")
        payload = jwt.decode(token, "secret", algorithms=["HS256"])
        return payload["tenant_id"]
    except Exception:
        raise HTTPException(status_code=401, detail="Invalid token")

def tenant_search(
    query_vector: list[float],
    tenant_id: str,
    top_k: int = 5,
    extra_filter: Optional[models.Filter] = None,
):
    """带租户隔离的向量检索"""
    # 强制添加 tenant_id 过滤条件
    tenant_filter = models.Filter(
        must=[
            models.FieldCondition(
                key="tenant_id",
                match=models.MatchValue(value=tenant_id)
            )
        ]
    )

    # 如有额外过滤条件,合并
    if extra_filter:
        combined_filter = models.Filter(
            must=[tenant_filter, extra_filter]
        )
    else:
        combined_filter = tenant_filter

    return client.search(
        collection_name="shared_collection",
        query_vector=query_vector,
        query_filter=combined_filter,
        limit=top_k,
        with_payload=True,
    )

@app.post("/query")
async def query(
    question: str,
    tenant_id: str = Depends(get_tenant_id)
):
    # 只检索该租户的文档
    q_vec = embed(question)
    results = tenant_search(q_vec, tenant_id)
    return generate_answer(question, results)

安全过滤与内容审核

from openai import OpenAI

openai_client = OpenAI()

class RAGSecurityLayer:
    """RAG 安全层:输入过滤 + 输出审核"""

    BLOCKED_PATTERNS = [
        "忽略之前的指令", "ignore previous instructions",
        "扮演", "pretend you are",
        "jailbreak", "越狱",
    ]

    def check_prompt_injection(self, query: str) -> bool:
        """检测 Prompt 注入攻击"""
        query_lower = query.lower()
        return any(
            pattern.lower() in query_lower
            for pattern in self.BLOCKED_PATTERNS
        )

    def check_content_policy(self, text: str) -> bool:
        """使用 OpenAI Moderation API 审核内容"""
        response = openai_client.moderations.create(input=text)
        result = response.results[0]
        return not result.flagged

    def safe_query(self, query: str) -> str:
        """安全查询:输入检查 → RAG → 输出审核"""
        # 1. Prompt 注入检测
        if self.check_prompt_injection(query):
            raise ValueError("检测到潜在的 Prompt 注入攻击")

        # 2. 查询内容审核
        if not self.check_content_policy(query):
            raise ValueError("查询内容不符合使用政策")

        # 3. 运行 RAG
        answer = rag_chain.invoke(query)

        # 4. 输出审核
        if not self.check_content_policy(answer):
            return "抱歉,我无法提供该问题的答案。"

        return answer

可观测性:LangSmith 集成

import os
from langsmith import Client

# 设置 LangSmith 追踪(在 .env 文件中配置)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "rag-production"

# 所有 LangChain 调用会自动追踪,无需修改代码
# LangSmith Dashboard 显示:延迟、Token 使用、检索结果、LLM 输入输出

# 自定义追踪指标
from langsmith import traceable

@traceable(name="rag_query", tags=["production"])
def traced_rag_query(question: str, tenant_id: str) -> dict:
    import time
    start = time.time()

    results = rag_chain.invoke(question)

    return {
        "answer": results,
        "latency_ms": (time.time() - start) * 1000,
        "tenant_id": tenant_id,
    }

成本控制策略

from functools import lru_cache
import hashlib
import redis
import json

redis_client = redis.Redis(host="localhost", port=6379)

class CostOptimizedRAG:
    """成本优化的 RAG 实现"""

    def __init__(self):
        self.embed_cache = {}  # 内存 Embedding 缓存
        self.answer_ttl = 3600  # 答案缓存 1 小时

    def cached_embed(self, text: str) -> list[float]:
        """Embedding 缓存,相同文本不重复请求 API"""
        key = hashlib.md5(text.encode()).hexdigest()
        if key not in self.embed_cache:
            self.embed_cache[key] = embed(text)
        return self.embed_cache[key]

    def cached_answer(self, question: str, tenant_id: str) -> Optional[str]:
        """检查 Redis 缓存是否有该问题的答案"""
        cache_key = f"rag:{tenant_id}:{hashlib.md5(question.encode()).hexdigest()}"
        cached = redis_client.get(cache_key)
        if cached:
            return json.loads(cached)
        return None

    def save_answer(self, question: str, tenant_id: str, answer: str):
        cache_key = f"rag:{tenant_id}:{hashlib.md5(question.encode()).hexdigest()}"
        redis_client.setex(cache_key, self.answer_ttl, json.dumps(answer))

    def smart_model_selection(self, question: str) -> str:
        """根据问题复杂度选择不同档次的 LLM"""
        # 简单问题用便宜模型
        simple_keywords = ["是什么", "定义", "who is", "what is"]
        if any(kw in question for kw in simple_keywords):
            return "gpt-4o-mini"   # 便宜 10x
        # 复杂推理用强模型
        return "gpt-4o"

# 成本估算工具
class CostTracker:
    PRICES = {
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},  # $/1K tokens
        "gpt-4o": {"input": 0.0025, "output": 0.01},
        "text-embedding-3-small": {"input": 0.00002, "output": 0},
    }

    def estimate_monthly_cost(self, queries_per_day: int, docs: int) -> dict:
        # 假设:每次查询平均 1000 input + 300 output tokens
        llm_cost = queries_per_day * 30 * (
            1000 * self.PRICES["gpt-4o-mini"]["input"] / 1000 +
            300 * self.PRICES["gpt-4o-mini"]["output"] / 1000
        )
        embed_cost = docs * 500 * self.PRICES["text-embedding-3-small"]["input"] / 1000
        return {"monthly_llm_usd": llm_cost, "one_time_embed_usd": embed_cost}

混合存储:向量数据库 + 图数据库

复杂的企业知识图谱场景下,纯向量检索有局限:无法处理"找所有直接下属文件"这样的结构化关系查询。向量 + 图的混合架构可以解决多跳推理问题。

# 向量数据库:语义检索(Qdrant)
# 图数据库:关系遍历(Neo4j)

from neo4j import GraphDatabase

neo4j_driver = GraphDatabase.driver(
    "bolt://localhost:7687",
    auth=("neo4j", "password")
)

def hybrid_search(question: str) -> str:
    """
    混合检索:
    1. 向量检索找到相关文档节点
    2. 图遍历扩展找相关联的文档
    3. 合并后送入 LLM
    """
    # 第一步:向量检索
    q_vec = embed(question)
    vector_hits = client.search("rag_knowledge_base", q_vec, limit=5)
    seed_ids = [h.payload["doc_id"] for h in vector_hits]

    # 第二步:图遍历扩展
    with neo4j_driver.session() as session:
        result = session.run("""
            MATCH (d:Document)-[:REFERENCES|RELATED_TO*1..2]->(related)
            WHERE d.doc_id IN $seed_ids
            RETURN DISTINCT related.doc_id AS doc_id, related.content AS content
            LIMIT 10
        """, seed_ids=seed_ids)
        graph_docs = [row["content"] for row in result]

    # 第三步:合并所有上下文
    all_contexts = (
        [h.payload["text"] for h in vector_hits] + graph_docs
    )

    return generate_answer(question, all_contexts)

生产部署架构图

用户请求
    │
    ▼
[API Gateway / 负载均衡]
    │
    ├──→ [认证服务] → 提取 tenant_id
    │
    ▼
[RAG API Server (FastAPI / 多实例)]
    │
    ├──→ [安全过滤层] (Prompt 注入检测 + 内容审核)
    │
    ├──→ [语义缓存 Redis] ──命中→ 直接返回
    │         未命中
    │
    ├──→ [Embedding Service] (本地 BGE / OpenAI)
    │
    ├──→ [Qdrant 集群] (向量检索 + 租户过滤)
    │         ↕
    ├──→ [BM25 检索 (Elasticsearch)]
    │
    ├──→ [Reranker Service] (BGE-Reranker)
    │
    ├──→ [LLM Service] (OpenAI / Claude / 本地 vLLM)
    │
    ├──→ [LangSmith / OpenTelemetry] (可观测性)
    │
    └──→ [RAGAS 在线评估] (持续质量监控)

存储层:
    Qdrant (向量) + PostgreSQL (元数据) + Redis (缓存) + S3 (原始文档)
水平扩展策略

RAG API Server 是无状态的,可以直接水平扩展。Qdrant 支持分片(每个节点一个分片),多个 API Server 共享同一个 Qdrant 集群。LLM 调用是最大瓶颈,可以考虑本地部署 vLLM(适合 QPS 高的私有化场景)。

文档更新策略

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import hashlib

class DocumentChangeHandler(FileSystemEventHandler):
    """监控文件系统变更,自动触发索引更新"""

    def __init__(self, indexer):
        self.indexer = indexer
        self.processed_hashes = self._load_hash_db()

    def on_created(self, event):
        if event.src_path.endswith('.pdf'):
            self.indexer.index_file(event.src_path)

    def on_modified(self, event):
        if event.src_path.endswith('.pdf'):
            new_hash = self._file_hash(event.src_path)
            # 只有内容变化时才重新索引
            if new_hash != self.processed_hashes.get(event.src_path):
                self.indexer.delete_by_source(event.src_path)
                self.indexer.index_file(event.src_path)
                self.processed_hashes[event.src_path] = new_hash

    def _file_hash(self, path: str) -> str:
        with open(path, 'rb') as f:
            return hashlib.sha256(f.read()).hexdigest()

# 启动文件监控
observer = Observer()
observer.schedule(DocumentChangeHandler(indexer), path="./knowledge_base/", recursive=True)
observer.start()
生产上线前检查清单

1. Qdrant API Key 已设置,不暴露公网
2. 所有 LLM API Key 通过环境变量注入,不在代码中硬编码
3. 向量量化(INT8)已开启,内存占用达到预期
4. 核心 Payload 字段已建索引
5. 已完成 RAGAS 基准测试并记录初始指标
6. LangSmith 或 OpenTelemetry 追踪已开启
7. 成本告警已配置(月度 LLM 费用阈值)
8. 备份策略已验证(Qdrant snapshot 定时备份到 S3)

完整课程总结