Chapter 04

Ingestion Pipeline · 切块与嵌入

数据进门之后,切块嵌入决定了整个 RAG 的命中率。chunk 切小查得准但答得碎,切大答得全但噪声多;embedding 模型选错连中英混查都做不好。这一章把两个决策一次讲完。

一、Ingestion 三件事

Documents ──▶ ①Splitter ──▶ Nodes ──▶ ②Metadata 加工 ──▶ ③Embedding ──▶ Vector Store

LlamaIndex 把这三步抽象成 transformation 序列,统一塞进 IngestionPipeline:

from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.openai import OpenAIEmbedding

pipeline = IngestionPipeline(transformations=[
    SentenceSplitter(chunk_size=512, chunk_overlap=50),
    OpenAIEmbedding(model="text-embedding-3-small"),
])
nodes = pipeline.run(documents=docs)

每个 transformation 输入 list[Node](Document 也是 Node),输出 list[Node]——这种统一接口让切块器、metadata 抽取器、embedding 可以自由组合。

二、Splitter 家族:五种主力

Splitter按什么切适合不适合
SentenceSplitter句子 + token 预算通用默认,中英都行代码、结构化文档
TokenTextSplitter纯 token极短文本、chat 记录会切断句子语义
SentenceWindowNodeParser句子 + 上下文窗口精准检索 + 大上下文合成节点数翻倍
SemanticSplitterNodeParserembedding 相似度主题漂移的长文,语义自然比字符切慢 5-10x
MarkdownNodeParserh1/h2/h3 标题层级技术文档、Wiki非 Markdown
HierarchicalNodeParser多级粗/细粒度AutoMergingRetriever存储 3-4x 膨胀
CodeSplitterAST 语法节点代码库 RAG非代码

SentenceSplitter — 默认选它

from llama_index.core.node_parser import SentenceSplitter

splitter = SentenceSplitter(
    chunk_size=512,            # token 预算,不是字符
    chunk_overlap=50,          # 相邻 chunk 重叠 token 数,防断句丢上下文
    separator=" ",              # 分词符
    paragraph_separator="\n\n\n",
    secondary_chunking_regex="[^,.;。?!]+[,.;。?!]?",  # 句子识别
)

MarkdownNodeParser — 技术文档神器

from llama_index.core.node_parser import MarkdownNodeParser

parser = MarkdownNodeParser()
nodes = parser.get_nodes_from_documents(md_docs)

# 每个 node.metadata 里会有:
# {"header_1": "API 参考", "header_2": "用户", "header_3": "创建用户"}
# 这些层级会自动拼进 embedding,检索时对"怎么创建用户"命中率拉满

SemanticSplitter — 按主题切

from llama_index.core.node_parser import SemanticSplitterNodeParser

splitter = SemanticSplitterNodeParser(
    embed_model=embed_model,
    buffer_size=1,                # 比较相邻几个句子
    breakpoint_percentile_threshold=95,  # 95 分位数以上的语义距离就切
)

原理:对每两个相邻句子算 embedding 距离,距离大就在那里切。长文里主题漂移明显时效果最好——但成本是每次 ingestion 都要先跑一遍 embedding 计算距离。

三、chunk_size 和 overlap 怎么选

这是 RAG 最被问烂的问题。标准答案没有,但决策框架有:

chunk_size命中率答案完整度适合
128-256高(chunk 小,语义集中)低(信息少)FAQ 型、短答案场景
512 🔥平衡平衡90% 场景的默认值
1024中(chunk 大,语义稀释)长回答、叙述型文本
2048+最高用 SummaryIndex 不是 VectorStoreIndex
实用建议:
① 不知道选啥——512 / overlap 50,能覆盖大部分场景
② 想精准 → 256 + 用 SentenceWindowNodeParser 扩上下文
③ 想省钱 → 小 chunk(256)+ 高 top_k(10)
④ embedding 模型的 最大输入长度要能装下 chunk——OpenAI 8K token,BGE-M3 支持 8K,小模型可能只有 512

overlap 的作用

overlap 主要防"答案正好在切割线上"的情况——比如一个定义横跨两段,不 overlap 就两边都不完整。经验值:

四、Metadata Extractor:给 chunk 加智能标签

切完的 Node 除了基础 metadata,还可以用 LLM 再抽一层——摘要、标题、关键词、问题候选:

from llama_index.core.extractors import (
    TitleExtractor,                  # 给每段生成标题
    SummaryExtractor,                # 段落摘要(可带左右邻居)
    QuestionsAnsweredExtractor,      # 生成 N 个"这段能回答的问题"
    KeywordExtractor,                # 抽关键词
)

pipeline = IngestionPipeline(transformations=[
    SentenceSplitter(chunk_size=512),
    TitleExtractor(nodes=5),                    # 每组 5 个节点共享一个标题
    QuestionsAnsweredExtractor(questions=3, llm=llm),
    embed_model,
])

真正的秘密在于 QuestionsAnsweredExtractor——它让每个 chunk 带 3 个"本段能回答的假想问题",这些问题被一起 embed,query-document 匹配变成了 query-question 匹配,命中率显著提升。代价是每个 chunk 多 3 次 LLM 调用。

成本提醒:metadata extractor 会对每个 chunk 调一次 LLM。1 万个 chunk × 每次 500 tokens = 5M tokens,一次 ingestion 几美元。只对高价值知识库(客服 FAQ、产品手册)用,海量文档档案库不值当。

五、Embedding 模型选型

模型维度语言MTEB 评分成本/1M token适合
text-embedding-3-small (OpenAI)1536多语62.3$0.02通用首选,性价比王
text-embedding-3-large (OpenAI)3072多语64.6$0.13要最强效果 + 不差钱
BGE-large-zh-v1.5 (BAAI)1024中文主64.5(C-MTEB)自部署中文 RAG 首选,自部署省钱
BGE-M3 (BAAI)1024多语 + 代码自部署中英混合、稀疏+稠密+多向量一起出
Jina-embeddings-v31024多语65+$0.018长文本(8K),长上下文
voyage-3 (Anthropic 系)1024多语$0.06代码/法律/金融垂直
Cohere embed-v31024多语$0.10带重排的全家桶
nomic-embed-text768英文主开源免费Ollama 本地部署小模型

选型决策树

  1. 纯中文场景 → BGE-large-zh-v1.5 或 BGE-M3(自部署),退而求其次 text-embedding-3-small
  2. 中英混合 → BGE-M3 或 text-embedding-3-small
  3. 要高性价比 + 不想自部署 → text-embedding-3-small,没得说
  4. 要最高精度 → text-embedding-3-large 或 voyage-3
  5. 长文档(单 chunk > 512 token)→ Jina v3(8K)、OpenAI v3(8K),别用 BGE(512)
  6. 代码检索 → voyage-code-3 或 jina-embeddings-v2-code
  7. 完全离线 → Ollama + nomic-embed-text 或 BGE-large

接入示例

# OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
embed = OpenAIEmbedding(model="text-embedding-3-small", embed_batch_size=100)

# 本地 BGE(HuggingFace)
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
embed = HuggingFaceEmbedding(
    model_name="BAAI/bge-large-zh-v1.5",
    device="cuda",               # 或 mps / cpu
    embed_batch_size=32,
    query_instruction="为这个句子生成表示以用于检索相关文章:",
)

# Ollama 本地
from llama_index.embeddings.ollama import OllamaEmbedding
embed = OllamaEmbedding(model_name="nomic-embed-text", base_url="http://localhost:11434")

# 全局设置(推荐)
from llama_index.core import Settings
Settings.embed_model = embed
BGE 的 query_instruction:BGE 家族训练时 query 侧加了前缀,不加的话效果明显下降。用 HuggingFaceEmbedding 时记得设置(LlamaIndex 默认会自动加,但自定义模型要注意)。

并发与 batch 调优

六、Pipeline 缓存:快速迭代的关键

调整参数时,每次重跑全量嵌入既慢又烧钱。IngestionPipeline 有个内置 transformation 缓存——同样的输入 + 同样的 transformation hash 会命中缓存:

from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.storage.kvstore.redis import RedisKVStore

cache = IngestionCache(
    cache=RedisKVStore.from_host_and_port("localhost", 6379),
    collection="llama_cache",
)

pipeline = IngestionPipeline(
    transformations=[SentenceSplitter(chunk_size=512), embed_model],
    cache=cache,
)
nodes = pipeline.run(documents=docs)
pipeline.persist("./pipeline_storage")

命中机制:transformation 对每个输入 Node 算 hash,命中直接返回缓存输出。改 chunk_size 会导致 SentenceSplitter 的配置 hash 变,但下游 embedding 如果输入文本没变还是能命中。

七、增量索引:DocstoreStrategy 详解

Ch3 提过 UPSERTS,这里讲清楚所有策略:

from llama_index.core.ingestion import DocstoreStrategy
from llama_index.core.storage.docstore import SimpleDocumentStore

pipeline = IngestionPipeline(
    transformations=[...],
    docstore=SimpleDocumentStore(),
    docstore_strategy=DocstoreStrategy.UPSERTS,  # 看下表
    vector_store=vector_store,
)
策略行为适合
DUPLICATES_ONLY(默认)doc_id 已存在就跳过,不管 hash 变没变一次性导入,静态数据
UPSERTS同 doc_id 不同 hash → 删旧 node + 插新 node🔥 日常增量首选
UPSERTS_AND_DELETEUPSERTS + 本次没出现在输入的 doc_id → 删除定时全扫的 Notion/Confluence,要同步删除

增量流程示例

# 第一次跑:全量
pipeline.run(documents=all_docs)
pipeline.persist("./pipeline")

# 第二天重新拉 Notion,得到新的 docs_today(可能有新增、修改、删除)
pipeline = IngestionPipeline.load("./pipeline")
# 策略设为 UPSERTS_AND_DELETE
pipeline.docstore_strategy = DocstoreStrategy.UPSERTS_AND_DELETE
pipeline.run(documents=docs_today)
# 自动:新增/修改 → 重嵌入;今天没出现的老 doc_id → 从 vector store 删除

八、并行与分布式

nodes = pipeline.run(
    documents=docs,
    num_workers=8,                  # 多进程
    show_progress=True,
)

几个要点:

九、端到端一个完整 Pipeline

from llama_index.core import Settings, SimpleDirectoryReader
from llama_index.core.ingestion import IngestionPipeline, DocstoreStrategy, IngestionCache
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor, QuestionsAnsweredExtractor
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import QdrantClient

Settings.llm = OpenAI(model="gpt-4o-mini")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")

vector_store = QdrantVectorStore(
    client=QdrantClient(url="http://localhost:6333"),
    collection_name="knowledge_v1",
)

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=512, chunk_overlap=50),
        TitleExtractor(nodes=5),
        QuestionsAnsweredExtractor(questions=3),
        Settings.embed_model,
    ],
    docstore=SimpleDocumentStore.from_persist_dir("./pipeline") if os.path.exists("./pipeline") else SimpleDocumentStore(),
    docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE,
    vector_store=vector_store,
    cache=IngestionCache(),
)

docs = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
nodes = pipeline.run(documents=docs, num_workers=4, show_progress=True)
pipeline.persist("./pipeline")

print(f"入库 {len(nodes)} 个 node")

十、常见坑

  1. chunk_size 用字符数当 token:SentenceSplitter 是按 token,1 字符 ≠ 1 token,英文 1 token ≈ 4 字符、中文 1 token ≈ 1-2 字符。
  2. MarkdownNodeParser 的 metadata 没设 excluded_embed_metadata_keys:header_1/2/3 本身会被拼进 embedding 是好事,但如果太长(比如中文标题拉一整行)会稀释内容。
  3. SemanticSplitter 用了 OpenAI embedding:每次 ingestion 都多一批调用,用本地 BGE 跑语义切块,用 OpenAI embed 入库 才是经济做法。
  4. Extractor 忘了限制范围:TitleExtractor/QuestionsAnsweredExtractor 默认对所有 chunk 调 LLM——海量数据几百刀一次。先抽样跑一批评估效果。
  5. 不开 Pipeline cache:调参时每次重跑全量 embed,慢 + 贵。
  6. embed_batch_size 太小:默认 10,OpenAI 其实支持 100+ 一批,改一下吞吐翻 10 倍。
  7. BGE 中文没设 query_instruction:检索准确率掉一截。
  8. 换了 embedding 模型不重建索引:维度和语义空间全变了,查询会胡乱召回——必须重嵌入。
  9. num_workers 开太多触发限流:OpenAI RPM/TPM 上限,并发失败率上升反而慢。
  10. Docstore 没 persist:下次跑没老 hash,UPSERTS 退化成全量重建。

十一、本章小结

记住:
① 默认组合:SentenceSplitter(512/50) + text-embedding-3-small——90% 场景够用。
② Markdown 必用 MarkdownNodeParser,代码必用 CodeSplitter,主题漂移的长文考虑 SemanticSplitter。
③ Metadata Extractor(特别是 QuestionsAnsweredExtractor)能显著提升命中率,但成本每 chunk 一次 LLM——给高价值数据用。
④ Pipeline cache + DocstoreStrategy.UPSERTS_AND_DELETE 是生产环境增量索引的标配,省时省钱还同步删除。