Chapter 05

Qdrant 实战

从安装部署到生产级查询,全面掌握 Qdrant 1.8+ 的 Collection 设计、向量操作与 Python SDK

Qdrant 核心概念

Collection
Qdrant 中的顶级逻辑单元,类似关系数据库的"表"。一个 Collection 存储相同维度、相同距离度量方式的向量集合。每个 Collection 可以有独立的索引配置和量化策略。
Point
向量库中的最小数据单元,包含三个部分:id(唯一标识符,整数或 UUID)、vector(向量数据)、payload(任意 JSON 元数据)。
Payload
与每个 Point 关联的 JSON 元数据,可以包含任意字段。关键特性:可以对 Payload 字段创建索引,在向量检索时同步进行精确过滤(Filtered Search)。
Shard
分片,将 Collection 数据分散到多个节点。每个分片是一个独立的 HNSW 索引,并行搜索后合并结果。Qdrant 1.8+ 支持自动分片均衡。
Named Vectors
命名向量,允许一个 Point 包含多个不同的向量(如文本向量 + 图像向量),每个向量独立索引,支持多向量混合查询。

安装与部署

方式一:Docker(推荐本地开发)

# 拉取最新版本
docker pull qdrant/qdrant

# 启动(数据持久化到本地目录)
docker run -d \
  --name qdrant \
  -p 6333:6333 \
  -p 6334:6334 \
  -v $(pwd)/qdrant_storage:/qdrant/storage \
  qdrant/qdrant

# 6333:REST API / Web UI
# 6334:gRPC API(生产高性能推荐)

# 验证启动
curl http://localhost:6333/health
# → {"title":"qdrant - vector search engine","version":"1.8.x"}

# 访问 Web Dashboard
# http://localhost:6333/dashboard

方式二:Docker Compose(带持久配置)

# docker-compose.yml
version: '3.8'
services:
  qdrant:
    image: qdrant/qdrant:v1.8.4
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - qdrant_data:/qdrant/storage
      - ./qdrant_config.yaml:/qdrant/config/production.yaml
    environment:
      - QDRANT__SERVICE__API_KEY=your-secret-key  # 生产环境必须设置
volumes:
  qdrant_data:

方式三:Python 嵌入式(仅测试)

# pip install qdrant-client[local]
from qdrant_client import QdrantClient

# 内存模式(程序退出后数据消失)
client = QdrantClient(":memory:")

# 本地文件模式
client = QdrantClient(path="./qdrant_local")

连接与 Collection 创建

# pip install qdrant-client openai
from qdrant_client import QdrantClient, models
from qdrant_client.models import (
    Distance, VectorParams,
    PointStruct, Filter, FieldCondition, MatchValue,
    SearchRequest, ScoredPoint,
)

# 连接本地 Qdrant
client = QdrantClient(
    host="localhost",
    port=6333,
    # api_key="your-secret-key",  # 有认证时开启
)

# 创建 Collection
client.create_collection(
    collection_name="rag_knowledge_base",
    vectors_config=VectorParams(
        size=1536,              # text-embedding-3-small 维度
        distance=Distance.COSINE, # 余弦相似度
    ),
    # HNSW 索引参数(可选,默认值通常够用)
    hnsw_config=models.HnswConfigDiff(
        m=16,                  # 每节点连接数,越大精度越高
        ef_construct=100,      # 构建时搜索宽度
        on_disk=False,         # True:索引存磁盘(大数据集用)
    ),
    # 向量量化(可大幅降低内存)
    quantization_config=models.ScalarQuantization(
        scalar=models.ScalarQuantizationConfig(
            type=models.ScalarType.INT8,  # 32位浮点→8位整型,内存降低75%
            always_ram=True,
        )
    ),
    optimizers_config=models.OptimizersConfigDiff(
        indexing_threshold=20000,  # 超过此数量后才建索引
    ),
)

print("Collection 创建成功")
info = client.get_collection("rag_knowledge_base")
print(f"向量数:{info.points_count}")

向量写入(Upsert)

from openai import OpenAI
import uuid

openai_client = OpenAI()

def embed_texts(texts: list[str]) -> list[list[float]]:
    """批量 Embedding"""
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [item.embedding for item in response.data]

# 准备文档数据
documents = [
    {
        "text": "RAG 由 Meta AI 于 2020 年提出,解决 LLM 幻觉问题。",
        "source": "rag_intro.pdf",
        "chapter": 1,
        "department": "research",
    },
    {
        "text": "Qdrant 使用 Rust 编写,支持 HNSW 索引和混合检索。",
        "source": "vector_db.pdf",
        "chapter": 4,
        "department": "engineering",
    },
    {
        "text": "文档分块策略影响 RAG 系统的检索精度,推荐递归字符分块。",
        "source": "chunking.pdf",
        "chapter": 2,
        "department": "engineering",
    },
]

texts = [doc["text"] for doc in documents]
vectors = embed_texts(texts)

# 构建 PointStruct 列表
points = [
    PointStruct(
        id=str(uuid.uuid4()),       # 使用 UUID 作为 ID
        vector=vectors[i],
        payload={                    # 任意 JSON 载荷
            "text": documents[i]["text"],
            "source": documents[i]["source"],
            "chapter": documents[i]["chapter"],
            "department": documents[i]["department"],
        }
    )
    for i, doc in enumerate(documents)
]

# 批量写入(upsert:存在则更新,不存在则插入)
operation_info = client.upsert(
    collection_name="rag_knowledge_base",
    wait=True,   # 等待写入完成再返回
    points=points
)
print(f"写入状态:{operation_info.status}")  # WriteOrdering.completed

相似度查询

# 基础向量搜索
query_text = "RAG 是什么?"
query_vector = embed_texts([query_text])[0]

results = client.search(
    collection_name="rag_knowledge_base",
    query_vector=query_vector,
    limit=3,                    # Top-K
    with_payload=True,          # 返回 payload
    score_threshold=0.6,        # 相似度阈值(低于此值不返回)
)

for hit in results:
    print(f"分数: {hit.score:.3f} | {hit.payload['text'][:50]}")

# ── 带过滤条件的查询 ───────────────────────
# 只检索 engineering 部门的文档
filtered_results = client.search(
    collection_name="rag_knowledge_base",
    query_vector=query_vector,
    query_filter=Filter(
        must=[
            FieldCondition(
                key="department",
                match=MatchValue(value="engineering")
            )
        ]
    ),
    limit=5,
)

# ── 复合过滤:AND / OR / NOT ────────────────
complex_filter = Filter(
    must=[
        FieldCondition(key="department", match=MatchValue(value="engineering")),
    ],
    should=[    # OR 条件
        FieldCondition(key="chapter", match=MatchValue(value=2)),
        FieldCondition(key="chapter", match=MatchValue(value=4)),
    ],
    must_not=[  # NOT 条件
        FieldCondition(key="source", match=MatchValue(value="deprecated.pdf")),
    ]
)

范围过滤与全文检索

from qdrant_client.models import Range, MatchText

# 数值范围过滤:只检索第 2-5 章的内容
range_filter = Filter(
    must=[
        FieldCondition(
            key="chapter",
            range=Range(gte=2, lte=5)  # greater_than_equal, less_than_equal
        )
    ]
)

# 文本包含过滤(Payload 全文搜索)
text_filter = Filter(
    must=[
        FieldCondition(
            key="text",
            match=MatchText(text="Qdrant")  # 包含 "Qdrant" 关键词
        )
    ]
)

# 为 Payload 字段创建索引(加速过滤)
client.create_payload_index(
    collection_name="rag_knowledge_base",
    field_name="department",
    field_schema=models.PayloadSchemaType.KEYWORD,  # 关键词类型
)
client.create_payload_index(
    collection_name="rag_knowledge_base",
    field_name="chapter",
    field_schema=models.PayloadSchemaType.INTEGER,
)

批量管理与更新

# 大批量写入(推荐批次大小 100-1000)
def batch_upsert(client, collection_name, documents, batch_size=100):
    """分批写入,避免单次请求过大"""
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i + batch_size]
        texts = [d["text"] for d in batch]
        vectors = embed_texts(texts)

        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector=vectors[j],
                payload=batch[j]
            )
            for j in range(len(batch))
        ]
        client.upsert(collection_name=collection_name, points=points, wait=False)
        print(f"写入批次 {i//batch_size + 1},共 {len(points)} 条")

# 更新 Payload(不重新索引向量)
client.set_payload(
    collection_name="rag_knowledge_base",
    payload={"status": "updated", "updated_at": "2025-03-01"},
    points=["point-id-1", "point-id-2"],
)

# 删除特定点
client.delete(
    collection_name="rag_knowledge_base",
    points_selector=models.PointIdsList(points=["point-id-1"]),
)

# 按 Payload 条件删除
client.delete(
    collection_name="rag_knowledge_base",
    points_selector=models.FilterSelector(
        filter=Filter(
            must=[FieldCondition(key="source", match=MatchValue(value="old_doc.pdf"))]
        )
    ),
)
Qdrant 生产建议

1. 写入时设置 wait=False 可提升吞吐量,但需要通过 get_collection 轮询确认状态。2. 对高频过滤字段创建 Payload 索引,可将带过滤查询从 O(n) 降至 O(log n)。3. 开启向量量化(INT8)可将内存占用降低 75%,精度损失通常低于 1%。

完整 RAG 集成示例

from qdrant_client import QdrantClient, models
from openai import OpenAI
from typing import Optional

client = QdrantClient(host="localhost", port=6333)
openai_client = OpenAI()
COLLECTION = "rag_knowledge_base"

def rag_query(
    question: str,
    top_k: int = 4,
    department: Optional[str] = None,
) -> dict:
    """完整 RAG 查询流程(带过滤支持)"""

    # 1. 问题向量化
    q_vec = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=question
    ).data[0].embedding

    # 2. 构建过滤条件
    search_filter = None
    if department:
        search_filter = models.Filter(
            must=[models.FieldCondition(
                key="department",
                match=models.MatchValue(value=department)
            )]
        )

    # 3. 向量检索
    hits = client.search(
        collection_name=COLLECTION,
        query_vector=q_vec,
        query_filter=search_filter,
        limit=top_k,
        with_payload=True,
        score_threshold=0.5,
    )

    if not hits:
        return {"answer": "未找到相关文档", "sources": []}

    # 4. 构建上下文
    context = "\n\n".join([
        f"[来源: {h.payload['source']}]\n{h.payload['text']}"
        for h in hits
    ])

    # 5. 生成答案
    prompt = f"""你是专业的技术助手。根据以下文档回答问题,不要编造信息。

参考文档:
{context}

问题:{question}
答案:"""

    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,
    )

    return {
        "answer": response.choices[0].message.content,
        "sources": [h.payload["source"] for h in hits],
        "scores": [h.score for h in hits],
    }

# 使用
result = rag_query("Qdrant 如何实现高性能检索?", department="engineering")
print(result["answer"])
print("来源:", result["sources"])

本章总结