架构蓝图
┌──────────────┐ ┌─────────────┐
│ Upload API │──→ │ Redis Queue │
└──────────────┘ └─────┬───────┘
▼
┌──────────────┐
│ Worker pool │ (GPU 索引节点)
│ PDF → img │
│ img → 向量 │
└──────┬───────┘
▼
┌──────────────┐
│ Qdrant │ (热:内存 binary)
│ + NVMe │ (冷:full precision)
└──────┬───────┘
▲
│
┌──────────────┐ ┌─────┴───────┐
│ Query API │──→ │ Retriever │ (CPU/GPU 共用)
└──────────────┘ └─────┬───────┘
▼
┌──────────────┐
│ VLM 生成 │ (Claude/GPT)
└──────────────┘
推理服务化选项
自写 FastAPI + transformers
最简单,写一个
/embed 端点,内部 batch 调用模型。适合中小流量。HuggingFace TEI(Text Embeddings Inference)
Rust 写的推理容器,支持多向量输出。启动:
docker run --gpus all -p 8080:80 ghcr.io/huggingface/text-embeddings-inference --model-id vidore/colpali-v1.2。比 Python 快 30%。vLLM(实验性)
2025 底 vLLM 开始支持视觉 embedding 模型,PagedAttention 让大 batch 吞吐 3-5 倍。
Ray Serve
需要自动扩缩容、多模型共存时上。
FastAPI 示例
from fastapi import FastAPI, UploadFile from pydantic import BaseModel from PIL import Image from io import BytesIO from transformers import ColPaliForRetrieval, ColPaliProcessor import torch, asyncio app = FastAPI() model = ColPaliForRetrieval.from_pretrained("vidore/colpali-v1.2", torch_dtype=torch.bfloat16, device_map="cuda").eval() processor = ColPaliProcessor.from_pretrained("vidore/colpali-v1.2") class Query(BaseModel): text: str @app.post("/embed/image") async def embed_image(file: UploadFile): img = Image.open(BytesIO(await file.read())) batch = processor.process_images([img]).to("cuda") with torch.no_grad(): vecs = model(**batch).embeddings[0] return {"vec": vecs.cpu().float().tolist()} @app.post("/embed/query") async def embed_query(q: Query): batch = processor.process_queries([q.text]).to("cuda") with torch.no_grad(): vecs = model(**batch).embeddings[0] return {"vec": vecs.cpu().float().tolist()}
动态批处理
查询 QPS 高时,多个独立请求可以在 GPU 里攒一批处理,延迟不增、吞吐翻倍。TEI 自带,自写版需手动:
from asyncio import Queue, gather, create_task queue: Queue = Queue() async def batcher(): while True: batch = [] batch.append(await queue.get()) try: while len(batch) < 16: batch.append(queue.get_nowait()) except: pass queries = [b["q"] for b in batch] results = run_model(queries) for b, r in zip(batch, results): b["fut"].set_result(r)
批量索引流水线
1. Ingestion
文档上传到 S3 → 触发 SQS/Redis 消息 → worker 拉取
2. 预处理
PDF→图,失败重试 3 次,损坏文件进死信队列人工 review
3. 向量化
GPU worker 按 batch 处理,每页记录 embedding + 预览图 URL
4. 写入 Qdrant
attribute metadata(doc_id、tags、permission)便于过滤查询
5. ACK
一个文档所有页面都写成功后才 ack 消息,失败回滚不留半成品
冷启动优化
# 模型在 worker 进程启动时预加载 @app.on_event("startup") async def warmup(): # 冷启动跑一次,让 cudnn 选算法 dummy_img = Image.new("RGB", (448, 448)) batch = processor.process_images([dummy_img]).to("cuda") with torch.no_grad(): _ = model(**batch)
Serverless 不适合
ColPali 模型 6GB+,每次冷启动要加载 20 秒。AWS Lambda、Cloud Run 这种按需启动的平台不合适——要么常驻,要么 Kubernetes + scale-to-zero(至少留 1 副本避免首请求超时)。
ColPali 模型 6GB+,每次冷启动要加载 20 秒。AWS Lambda、Cloud Run 这种按需启动的平台不合适——要么常驻,要么 Kubernetes + scale-to-zero(至少留 1 副本避免首请求超时)。
热更新与模型版本
A/B 切流
新模型部署成
/v2/embed,先 1% 流量镜像评估,确认 nDCG 无回归再切。Embedding 版本兼容性
新旧模型的 embedding 空间不可混算。更换模型等于全量重做索引。规划停机窗口或双写索引。
Shadow indexing
切换模型前 2 天开始用新模型并行建一个 shadow 索引,完毕再原子切换。
监控指标
- 📈 每秒 embedding 数 / 失败率 / p50/p99 延迟
- 📈 GPU 利用率 / 显存占用
- 📈 Qdrant 写入延迟 / 查询召回数 / binary→full 升级率
- 📈 业务侧 nDCG@5(定期 A/B 抽样真实 query)
- 📈 VLM 下游生成错误率 / 幻觉率(人工抽检 + 自动判分)
灾备与多副本
- 模型权重打到容器镜像里,避免依赖 HF 下载
- 向量数据库至少 3 节点集群,支持跨可用区复制
- 原始 PDF 永久存 S3 + Glacier,向量能重建,原文不能丢
- Schema/model 版本写 registry(如 MLflow / Weights&Biases),便于回溯
本章小结
- 推理服务化:TEI/vLLM 优先,自写 FastAPI 适合起步
- 动态批处理 + 预热让查询 QPS 飙升
- 索引流水线:队列驱动、文档级事务、失败可重试
- 模型升级 ≠ 配置升级——要重建索引,提前规划
- 监控四维:吞吐、资源、质量、业务效果