Qdrant 核心概念
Collection
Qdrant 中的顶级逻辑单元,类似关系数据库的"表"。一个 Collection 存储相同维度、相同距离度量方式的向量集合。每个 Collection 可以有独立的索引配置和量化策略。
Point
向量库中的最小数据单元,包含三个部分:id(唯一标识符,整数或 UUID)、vector(向量数据)、payload(任意 JSON 元数据)。
Payload
与每个 Point 关联的 JSON 元数据,可以包含任意字段。关键特性:可以对 Payload 字段创建索引,在向量检索时同步进行精确过滤(Filtered Search)。
Shard
分片,将 Collection 数据分散到多个节点。每个分片是一个独立的 HNSW 索引,并行搜索后合并结果。Qdrant 1.8+ 支持自动分片均衡。
Named Vectors
命名向量,允许一个 Point 包含多个不同的向量(如文本向量 + 图像向量),每个向量独立索引,支持多向量混合查询。
安装与部署
方式一:Docker(推荐本地开发)
# 拉取最新版本
docker pull qdrant/qdrant
# 启动(数据持久化到本地目录)
docker run -d \
--name qdrant \
-p 6333:6333 \
-p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant
# 6333:REST API / Web UI
# 6334:gRPC API(生产高性能推荐)
# 验证启动
curl http://localhost:6333/health
# → {"title":"qdrant - vector search engine","version":"1.8.x"}
# 访问 Web Dashboard
# http://localhost:6333/dashboard
方式二:Docker Compose(带持久配置)
# docker-compose.yml
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:v1.8.4
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_data:/qdrant/storage
- ./qdrant_config.yaml:/qdrant/config/production.yaml
environment:
- QDRANT__SERVICE__API_KEY=your-secret-key # 生产环境必须设置
volumes:
qdrant_data:
方式三:Python 嵌入式(仅测试)
# pip install qdrant-client[local]
from qdrant_client import QdrantClient
# 内存模式(程序退出后数据消失)
client = QdrantClient(":memory:")
# 本地文件模式
client = QdrantClient(path="./qdrant_local")
连接与 Collection 创建
# pip install qdrant-client openai
from qdrant_client import QdrantClient, models
from qdrant_client.models import (
Distance, VectorParams,
PointStruct, Filter, FieldCondition, MatchValue,
SearchRequest, ScoredPoint,
)
# 连接本地 Qdrant
client = QdrantClient(
host="localhost",
port=6333,
# api_key="your-secret-key", # 有认证时开启
)
# 创建 Collection
client.create_collection(
collection_name="rag_knowledge_base",
vectors_config=VectorParams(
size=1536, # text-embedding-3-small 维度
distance=Distance.COSINE, # 余弦相似度
),
# HNSW 索引参数(可选,默认值通常够用)
hnsw_config=models.HnswConfigDiff(
m=16, # 每节点连接数,越大精度越高
ef_construct=100, # 构建时搜索宽度
on_disk=False, # True:索引存磁盘(大数据集用)
),
# 向量量化(可大幅降低内存)
quantization_config=models.ScalarQuantization(
scalar=models.ScalarQuantizationConfig(
type=models.ScalarType.INT8, # 32位浮点→8位整型,内存降低75%
always_ram=True,
)
),
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=20000, # 超过此数量后才建索引
),
)
print("Collection 创建成功")
info = client.get_collection("rag_knowledge_base")
print(f"向量数:{info.points_count}")
向量写入(Upsert)
from openai import OpenAI
import uuid
openai_client = OpenAI()
def embed_texts(texts: list[str]) -> list[list[float]]:
"""批量 Embedding"""
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return [item.embedding for item in response.data]
# 准备文档数据
documents = [
{
"text": "RAG 由 Meta AI 于 2020 年提出,解决 LLM 幻觉问题。",
"source": "rag_intro.pdf",
"chapter": 1,
"department": "research",
},
{
"text": "Qdrant 使用 Rust 编写,支持 HNSW 索引和混合检索。",
"source": "vector_db.pdf",
"chapter": 4,
"department": "engineering",
},
{
"text": "文档分块策略影响 RAG 系统的检索精度,推荐递归字符分块。",
"source": "chunking.pdf",
"chapter": 2,
"department": "engineering",
},
]
texts = [doc["text"] for doc in documents]
vectors = embed_texts(texts)
# 构建 PointStruct 列表
points = [
PointStruct(
id=str(uuid.uuid4()), # 使用 UUID 作为 ID
vector=vectors[i],
payload={ # 任意 JSON 载荷
"text": documents[i]["text"],
"source": documents[i]["source"],
"chapter": documents[i]["chapter"],
"department": documents[i]["department"],
}
)
for i, doc in enumerate(documents)
]
# 批量写入(upsert:存在则更新,不存在则插入)
operation_info = client.upsert(
collection_name="rag_knowledge_base",
wait=True, # 等待写入完成再返回
points=points
)
print(f"写入状态:{operation_info.status}") # WriteOrdering.completed
相似度查询
# 基础向量搜索
query_text = "RAG 是什么?"
query_vector = embed_texts([query_text])[0]
results = client.search(
collection_name="rag_knowledge_base",
query_vector=query_vector,
limit=3, # Top-K
with_payload=True, # 返回 payload
score_threshold=0.6, # 相似度阈值(低于此值不返回)
)
for hit in results:
print(f"分数: {hit.score:.3f} | {hit.payload['text'][:50]}")
# ── 带过滤条件的查询 ───────────────────────
# 只检索 engineering 部门的文档
filtered_results = client.search(
collection_name="rag_knowledge_base",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="department",
match=MatchValue(value="engineering")
)
]
),
limit=5,
)
# ── 复合过滤:AND / OR / NOT ────────────────
complex_filter = Filter(
must=[
FieldCondition(key="department", match=MatchValue(value="engineering")),
],
should=[ # OR 条件
FieldCondition(key="chapter", match=MatchValue(value=2)),
FieldCondition(key="chapter", match=MatchValue(value=4)),
],
must_not=[ # NOT 条件
FieldCondition(key="source", match=MatchValue(value="deprecated.pdf")),
]
)
范围过滤与全文检索
from qdrant_client.models import Range, MatchText
# 数值范围过滤:只检索第 2-5 章的内容
range_filter = Filter(
must=[
FieldCondition(
key="chapter",
range=Range(gte=2, lte=5) # greater_than_equal, less_than_equal
)
]
)
# 文本包含过滤(Payload 全文搜索)
text_filter = Filter(
must=[
FieldCondition(
key="text",
match=MatchText(text="Qdrant") # 包含 "Qdrant" 关键词
)
]
)
# 为 Payload 字段创建索引(加速过滤)
client.create_payload_index(
collection_name="rag_knowledge_base",
field_name="department",
field_schema=models.PayloadSchemaType.KEYWORD, # 关键词类型
)
client.create_payload_index(
collection_name="rag_knowledge_base",
field_name="chapter",
field_schema=models.PayloadSchemaType.INTEGER,
)
批量管理与更新
# 大批量写入(推荐批次大小 100-1000)
def batch_upsert(client, collection_name, documents, batch_size=100):
"""分批写入,避免单次请求过大"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
texts = [d["text"] for d in batch]
vectors = embed_texts(texts)
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=vectors[j],
payload=batch[j]
)
for j in range(len(batch))
]
client.upsert(collection_name=collection_name, points=points, wait=False)
print(f"写入批次 {i//batch_size + 1},共 {len(points)} 条")
# 更新 Payload(不重新索引向量)
client.set_payload(
collection_name="rag_knowledge_base",
payload={"status": "updated", "updated_at": "2025-03-01"},
points=["point-id-1", "point-id-2"],
)
# 删除特定点
client.delete(
collection_name="rag_knowledge_base",
points_selector=models.PointIdsList(points=["point-id-1"]),
)
# 按 Payload 条件删除
client.delete(
collection_name="rag_knowledge_base",
points_selector=models.FilterSelector(
filter=Filter(
must=[FieldCondition(key="source", match=MatchValue(value="old_doc.pdf"))]
)
),
)
Qdrant 生产建议
1. 写入时设置 wait=False 可提升吞吐量,但需要通过 get_collection 轮询确认状态。2. 对高频过滤字段创建 Payload 索引,可将带过滤查询从 O(n) 降至 O(log n)。3. 开启向量量化(INT8)可将内存占用降低 75%,精度损失通常低于 1%。
完整 RAG 集成示例
from qdrant_client import QdrantClient, models
from openai import OpenAI
from typing import Optional
client = QdrantClient(host="localhost", port=6333)
openai_client = OpenAI()
COLLECTION = "rag_knowledge_base"
def rag_query(
question: str,
top_k: int = 4,
department: Optional[str] = None,
) -> dict:
"""完整 RAG 查询流程(带过滤支持)"""
# 1. 问题向量化
q_vec = openai_client.embeddings.create(
model="text-embedding-3-small",
input=question
).data[0].embedding
# 2. 构建过滤条件
search_filter = None
if department:
search_filter = models.Filter(
must=[models.FieldCondition(
key="department",
match=models.MatchValue(value=department)
)]
)
# 3. 向量检索
hits = client.search(
collection_name=COLLECTION,
query_vector=q_vec,
query_filter=search_filter,
limit=top_k,
with_payload=True,
score_threshold=0.5,
)
if not hits:
return {"answer": "未找到相关文档", "sources": []}
# 4. 构建上下文
context = "\n\n".join([
f"[来源: {h.payload['source']}]\n{h.payload['text']}"
for h in hits
])
# 5. 生成答案
prompt = f"""你是专业的技术助手。根据以下文档回答问题,不要编造信息。
参考文档:
{context}
问题:{question}
答案:"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
)
return {
"answer": response.choices[0].message.content,
"sources": [h.payload["source"] for h in hits],
"scores": [h.score for h in hits],
}
# 使用
result = rag_query("Qdrant 如何实现高性能检索?", department="engineering")
print(result["answer"])
print("来源:", result["sources"])
本章总结
- Qdrant 核心概念:Collection(表)→ Point(行)→ Vector + Payload(列)
- 部署方式:Docker 最简,生产建议挂载持久卷并设置 API Key 认证
- 过滤查询是 Qdrant 的核心竞争力,对过滤字段建索引是必须的性能优化
- 量化(INT8)在损失不足 1% 精度的情况下将内存降低 75%,生产强烈推荐