Chapter 08

LangChain RAG 进阶

掌握 LangChain 0.3+ 的 LCEL 链式组合、对话式 RAG、记忆管理与流式输出,构建交互式 AI 应用

LangChain 0.3 架构演进

LangChain 0.3 是一次重要的架构重整,引入了更清晰的包结构和 LCEL(LangChain Expression Language)作为一等公民。理解这次变化对于写出现代化的 LangChain 代码至关重要。

LCEL
LangChain Expression Language,LangChain 的链式组合语言。使用 | 管道符将 Runnable 组件串联,支持流式、批量、异步调用,自动并行化和内置重试机制。
Runnable
LangChain 0.3 的核心接口,所有组件(提示模板、LLM、检索器、工具)都实现 Runnable 接口,统一提供 invoke、batch、stream、ainvoke 方法。
RunnablePassthrough
将输入直接传递到下一步的特殊 Runnable,常用于在链中保留原始查询,同时让其他分支处理检索等任务。
RunnableParallel
并行执行多个 Runnable,将各自结果合并为字典。在 RAG 中常用于同时准备"上下文"和"问题"两个输入。
ChatMessageHistory
存储对话历史的组件,支持内存存储、Redis、DynamoDB、PostgreSQL 等多种后端。与 RunnableWithMessageHistory 结合实现有状态的对话 RAG。

LCEL 基础:构建 RAG 链

# pip install langchain langchain-openai langchain-qdrant
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from qdrant_client import QdrantClient

# 初始化组件
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# Qdrant 向量存储
qdrant = QdrantVectorStore(
    client=QdrantClient(host="localhost", port=6333),
    collection_name="rag_knowledge_base",
    embedding=embeddings,
)
retriever = qdrant.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 4, "score_threshold": 0.5},
)

# RAG 提示模板
RAG_PROMPT = ChatPromptTemplate.from_template("""你是专业的技术助手,基于提供的上下文准确回答问题。
如果上下文中没有足够信息,请如实说明,不要编造内容。

上下文:
{context}

问题:{question}

答案(请引用相关内容并注明来源):""")

def format_docs(docs) -> str:
    """将检索到的文档格式化为字符串"""
    return "\n\n".join(
        f"[来源 {i+1}: {doc.metadata.get('source', '未知')}]\n{doc.page_content}"
        for i, doc in enumerate(docs)
    )

# 构建 LCEL RAG 链
rag_chain = (
    RunnableParallel(
        context=retriever | format_docs,   # 检索 + 格式化
        question=RunnablePassthrough()     # 直接传递问题
    )
    | RAG_PROMPT
    | llm
    | StrOutputParser()
)

# 调用
answer = rag_chain.invoke("RAG 和微调的核心区别是什么?")
print(answer)

流式输出

# 流式输出(逐 token 打印)
for chunk in rag_chain.stream("请详细解释 HNSW 索引的工作原理"):
    print(chunk, end="", flush=True)
print()

# 异步流式(FastAPI 中使用)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/ask")
async def ask_stream(question: str):
    async def generate():
        async for chunk in rag_chain.astream(question):
            yield f"data: {chunk}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

对话式 RAG(ConversationalRetrievalChain)

多轮对话中,用户往往会用代词("它"、"这个")指代前文提到的概念。对话式 RAG 需要历史感知的查询改写——将上下文历史融合进检索查询中。

from langchain_core.prompts import MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

# ── 步骤1:历史感知检索器 ──────────────────
contextualize_prompt = ChatPromptTemplate.from_messages([
    ("system", """根据对话历史和最新用户问题,
将问题改写为独立的搜索查询(不依赖历史上下文)。
如果问题已经独立完整,直接返回原问题。"""),
    MessagesPlaceholder("chat_history"),
    ("human", "{input}"),
])

# 历史感知检索链
history_aware_retriever = (
    contextualize_prompt
    | llm
    | StrOutputParser()
    | retriever
)

# ── 步骤2:带历史的问答链 ──────────────────
qa_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是专业技术助手。基于以下上下文回答问题。
上下文:{context}"""),
    MessagesPlaceholder("chat_history"),
    ("human", "{input}"),
])

qa_chain = (
    RunnableParallel(
        context=history_aware_retriever | format_docs,
        input=RunnablePassthrough(),
        chat_history=lambda x: x["chat_history"],
    )
    | qa_prompt
    | llm
    | StrOutputParser()
)

# ── 步骤3:注入记忆管理 ───────────────────
store = {}  # 实际生产用 Redis 等持久化存储

def get_session_history(session_id: str) -> ChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

conversational_rag = RunnableWithMessageHistory(
    qa_chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="chat_history",
    output_messages_key="output",
)

# 多轮对话测试
config = {"configurable": {"session_id": "user_001"}}

r1 = conversational_rag.invoke(
    {"input": "RAG 是什么?"}, config=config
)
print("Q1:", r1)

r2 = conversational_rag.invoke(
    {"input": "它的主要优势有哪些?"},  # "它" 指代 RAG
    config=config
)
print("Q2:", r2)

混合检索 LCEL 链

from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

# BM25 检索器(来自原始文档列表)
bm25_retriever = BM25Retriever.from_documents(
    documents,
    k=20
)

# 向量检索器
vector_retriever = qdrant.as_retriever(search_kwargs={"k": 20})

# 混合检索:0.4 BM25 + 0.6 向量
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, vector_retriever],
    weights=[0.4, 0.6],
)

# 加上 Reranker
cross_encoder = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")
reranker = CrossEncoderReranker(model=cross_encoder, top_n=5)

advanced_retriever = ContextualCompressionRetriever(
    base_compressor=reranker,
    base_retriever=ensemble_retriever,
)

# 组装完整 Advanced RAG 链
advanced_rag = (
    RunnableParallel(
        context=advanced_retriever | format_docs,
        question=RunnablePassthrough(),
    )
    | RAG_PROMPT
    | llm
    | StrOutputParser()
)

Redis 持久化对话记忆

# pip install langchain-redis redis
from langchain_redis import RedisChatMessageHistory

def get_redis_history(session_id: str) -> RedisChatMessageHistory:
    return RedisChatMessageHistory(
        session_id=session_id,
        url="redis://localhost:6379",
        ttl=3600 * 24 * 7,  # 7天过期
        key_prefix="rag_chat:",
    )

# 记忆窗口管理(只保留最近 N 条消息)
from langchain_core.runnables import RunnableLambda

def trim_messages(messages, max_messages: int = 10):
    """只保留最近 10 条对话,避免 context 过长"""
    if len(messages) > max_messages:
        return messages[-max_messages:]
    return messages
对话记忆管理策略

窗口记忆(Window Memory):只保留最近 N 轮,实现简单,适合大多数场景。摘要记忆(Summary Memory):用 LLM 压缩历史为摘要,适合超长对话。知识图谱记忆(KG Memory):将历史中的实体关系存为图,适合需要跨轮推理的复杂场景。

Self-Query 自然语言过滤

让 LLM 自动从用户问题中提取过滤条件,实现"只搜索 2024 年后的工程部文档"这样的自然语言检索。

from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo

# 描述文档的元数据结构
metadata_field_info = [
    AttributeInfo(
        name="source",
        description="文档来源文件名",
        type="string",
    ),
    AttributeInfo(
        name="department",
        description="所属部门:engineering 或 research",
        type="string",
    ),
    AttributeInfo(
        name="chapter",
        description="章节编号,1-10",
        type="integer",
    ),
]

self_query_retriever = SelfQueryRetriever.from_llm(
    llm=llm,
    vectorstore=qdrant,
    document_contents="RAG 和向量数据库相关技术文档",
    metadata_field_info=metadata_field_info,
    verbose=True,
)

# 自动解析过滤条件
docs = self_query_retriever.invoke(
    "找 engineering 部门第3章以后的内容"
)
# LLM 自动转换为:department == "engineering" AND chapter > 3

本章总结