Chapter 07

LlamaIndex 构建 RAG

深入 LlamaIndex 0.10+ 的核心抽象——Index、Node、Query Engine——构建生产级 RAG 流水线

LlamaIndex 核心概念体系

LlamaIndex(原名 GPT Index)是专为 LLM 应用设计的数据框架,在 RAG 领域与 LangChain 并列为两大主流选择。0.10 版本引入了更清晰的模块化架构。

Document
原始文档对象,包含 text(文本内容)和 metadata(元数据)。对应加载阶段的输出,是 Node 的原始来源。
Node
LlamaIndex 的核心数据单元,代表文档的一个语义分块。Node 之间可以有关联关系(父子关系、引用关系),形成知识图谱结构。
Index
将 Node 集合组织成可检索结构的核心组件。VectorStoreIndex 是最常用的实现,将 Node 的 Embedding 存入向量数据库,支持语义检索。
Retriever
从 Index 中检索相关 Node 的组件。VectorIndexRetriever 进行向量检索,可配置 top_k、相似度阈值、元数据过滤等。
Query Engine
端到端的查询接口,封装了检索 → 后处理 → 合成答案的完整流程。支持流式输出、引用源追踪、响应格式自定义。
Response Synthesizer
负责将检索到的 Node 和原始查询合成最终答案的组件。支持多种模式:compact(压缩上下文)、refine(迭代精炼)、tree_summarize(树状摘要)。

安装与基础配置

# pip install llama-index llama-index-embeddings-openai
# pip install llama-index-vector-stores-qdrant qdrant-client

from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    StorageContext,
    Settings,
)
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
import os

# 全局配置(0.10+ 推荐使用 Settings 代替 ServiceContext)
Settings.llm = OpenAI(
    model="gpt-4o-mini",
    temperature=0.1,
    max_tokens=1024,
)
Settings.embed_model = OpenAIEmbedding(
    model="text-embedding-3-small",
    embed_batch_size=100,
)
Settings.chunk_size = 1024       # Node 分块大小
Settings.chunk_overlap = 128     # 分块重叠

最简 RAG:5 行代码

# 1. 加载文档
documents = SimpleDirectoryReader("./data/").load_data()

# 2. 建立索引(自动完成分块 + Embedding + 存储)
index = VectorStoreIndex.from_documents(documents, show_progress=True)

# 3. 创建查询引擎
query_engine = index.as_query_engine(similarity_top_k=4)

# 4. 查询
response = query_engine.query("RAG 是如何解决 LLM 幻觉问题的?")
print(response)

# 5. 查看引用来源
for node in response.source_nodes:
    print(f"来源:{node.metadata['file_name']} | 相似度:{node.score:.3f}")

与 Qdrant 集成

from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import QdrantClient, models

# 初始化 Qdrant
qdrant_client = QdrantClient(host="localhost", port=6333)

# 创建向量存储
vector_store = QdrantVectorStore(
    client=qdrant_client,
    collection_name="llamaindex_rag",
)

# StorageContext 封装存储后端
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# 建立索引(写入 Qdrant)
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    show_progress=True,
)

# 下次启动直接从 Qdrant 加载,无需重新索引
index = VectorStoreIndex.from_vector_store(
    vector_store=vector_store
)

query_engine = index.as_query_engine(
    similarity_top_k=5,
    response_mode="compact",   # compact / refine / tree_summarize
)

response = query_engine.query("Qdrant 的量化策略有哪些?")
print(str(response))

高级检索:多种 Retriever 配置

from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import (
    SimilarityPostprocessor,
    MetadataReplacementPostProcessor,
)
from llama_index.postprocessor.flag_embedding_reranker import FlagEmbeddingReranker

# 1. 向量检索器
retriever = VectorIndexRetriever(
    index=index,
    similarity_top_k=20,   # 先粗召回 20 个
    filters=None,          # MetadataFilters 可加过滤
)

# 2. 后处理节点:过滤低相似度 + BGE Reranker + 相似度阈值
postprocessors = [
    SimilarityPostprocessor(similarity_cutoff=0.5),  # 过滤分数 < 0.5 的
    FlagEmbeddingReranker(
        model="BAAI/bge-reranker-v2-m3",
        top_n=5,           # Reranker 后保留 5 个
    ),
]

# 3. 组装查询引擎
query_engine = RetrieverQueryEngine(
    retriever=retriever,
    node_postprocessors=postprocessors,
)

元数据过滤(Metadata Filtering)

from llama_index.core.vector_stores import (
    MetadataFilter,
    MetadataFilters,
    FilterOperator,
    FilterCondition,
)

# 只检索特定部门且章节在 1-5 的文档
filters = MetadataFilters(
    filters=[
        MetadataFilter(
            key="department",
            value="engineering",
            operator=FilterOperator.EQ
        ),
        MetadataFilter(
            key="chapter",
            value=5,
            operator=FilterOperator.LTE  # less_than_equal
        ),
    ],
    condition=FilterCondition.AND
)

# 应用过滤
filtered_engine = index.as_query_engine(
    similarity_top_k=5,
    filters=filters,
)

增量文档更新策略

生产系统中文档经常需要更新。LlamaIndex 提供了增量索引管理,避免全量重建索引。

from llama_index.core import StorageContext, load_index_from_storage
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.node_parser import SentenceSplitter

# 使用摄入流水线(带缓存,避免重复处理)
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=128),
        Settings.embed_model,
    ],
    vector_store=vector_store,
    cache=IngestionCache(
        cache="./pipeline_cache"  # 持久化缓存,相同文档不重复 Embed
    )
)

# 只处理新文档
new_documents = SimpleDirectoryReader("./new_data/").load_data()
nodes = pipeline.run(documents=new_documents)
print(f"新增 {len(nodes)} 个 Node")

# 删除特定来源的文档
from llama_index.core import DocumentStore

# 通过 doc_id 删除(每个 Document 有唯一 ID)
index.delete_ref_doc("deprecated_doc_id", delete_from_docstore=True)

流式输出与异步查询

import asyncio

# 流式输出(边生成边显示)
streaming_engine = index.as_query_engine(streaming=True)
response = streaming_engine.query("详细解释 RAG 的工作流程")

for token in response.response_gen:
    print(token, end="", flush=True)

# 异步查询(适合 FastAPI 等异步框架)
async_engine = index.as_async_query_engine(similarity_top_k=4)

async def ask(question: str):
    response = await async_engine.aquery(question)
    return str(response)

# 并发查询多个问题
questions = [
    "RAG 是什么?",
    "Qdrant 的索引算法是什么?",
    "如何评估 RAG 质量?",
]

async def batch_query():
    tasks = [ask(q) for q in questions]
    answers = await asyncio.gather(*tasks)
    return dict(zip(questions, answers))

results = asyncio.run(batch_query())
LlamaIndex 0.10 Breaking Changes

0.10 版本将大量 integration 拆分为独立包(如 llama-index-vector-stores-qdrant),不再内置。迁移时需要单独安装对应的 integration 包,并从新的路径导入。官方迁移指南:docs.llamaindex.ai/migration

RAG 与 Agent 结合

from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.agent import ReActAgent

# 将查询引擎包装为 Tool
rag_tool = QueryEngineTool(
    query_engine=query_engine,
    metadata=ToolMetadata(
        name="rag_knowledge_base",
        description="查询 RAG 和向量数据库相关知识,用于回答技术问题",
    ),
)

# 创建 ReAct Agent(可以决定何时调用 RAG)
agent = ReActAgent.from_tools(
    tools=[rag_tool],
    llm=Settings.llm,
    verbose=True,
    max_iterations=5,
)

response = agent.chat("请比较 Qdrant 和 Milvus 的适用场景,给出选型建议")
print(response)

本章总结