LangChain 0.3 架构演进
LangChain 0.3 是一次重要的架构重整,引入了更清晰的包结构和 LCEL(LangChain Expression Language)作为一等公民。理解这次变化对于写出现代化的 LangChain 代码至关重要。
LCEL
LangChain Expression Language,LangChain 的链式组合语言。使用 | 管道符将 Runnable 组件串联,支持流式、批量、异步调用,自动并行化和内置重试机制。
Runnable
LangChain 0.3 的核心接口,所有组件(提示模板、LLM、检索器、工具)都实现 Runnable 接口,统一提供 invoke、batch、stream、ainvoke 方法。
RunnablePassthrough
将输入直接传递到下一步的特殊 Runnable,常用于在链中保留原始查询,同时让其他分支处理检索等任务。
RunnableParallel
并行执行多个 Runnable,将各自结果合并为字典。在 RAG 中常用于同时准备"上下文"和"问题"两个输入。
ChatMessageHistory
存储对话历史的组件,支持内存存储、Redis、DynamoDB、PostgreSQL 等多种后端。与 RunnableWithMessageHistory 结合实现有状态的对话 RAG。
LCEL 基础:构建 RAG 链
# pip install langchain langchain-openai langchain-qdrant
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from qdrant_client import QdrantClient
# 初始化组件
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# Qdrant 向量存储
qdrant = QdrantVectorStore(
client=QdrantClient(host="localhost", port=6333),
collection_name="rag_knowledge_base",
embedding=embeddings,
)
retriever = qdrant.as_retriever(
search_type="similarity",
search_kwargs={"k": 4, "score_threshold": 0.5},
)
# RAG 提示模板
RAG_PROMPT = ChatPromptTemplate.from_template("""你是专业的技术助手,基于提供的上下文准确回答问题。
如果上下文中没有足够信息,请如实说明,不要编造内容。
上下文:
{context}
问题:{question}
答案(请引用相关内容并注明来源):""")
def format_docs(docs) -> str:
"""将检索到的文档格式化为字符串"""
return "\n\n".join(
f"[来源 {i+1}: {doc.metadata.get('source', '未知')}]\n{doc.page_content}"
for i, doc in enumerate(docs)
)
# 构建 LCEL RAG 链
rag_chain = (
RunnableParallel(
context=retriever | format_docs, # 检索 + 格式化
question=RunnablePassthrough() # 直接传递问题
)
| RAG_PROMPT
| llm
| StrOutputParser()
)
# 调用
answer = rag_chain.invoke("RAG 和微调的核心区别是什么?")
print(answer)
流式输出
# 流式输出(逐 token 打印)
for chunk in rag_chain.stream("请详细解释 HNSW 索引的工作原理"):
print(chunk, end="", flush=True)
print()
# 异步流式(FastAPI 中使用)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/ask")
async def ask_stream(question: str):
async def generate():
async for chunk in rag_chain.astream(question):
yield f"data: {chunk}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
对话式 RAG(ConversationalRetrievalChain)
多轮对话中,用户往往会用代词("它"、"这个")指代前文提到的概念。对话式 RAG 需要历史感知的查询改写——将上下文历史融合进检索查询中。
from langchain_core.prompts import MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
# ── 步骤1:历史感知检索器 ──────────────────
contextualize_prompt = ChatPromptTemplate.from_messages([
("system", """根据对话历史和最新用户问题,
将问题改写为独立的搜索查询(不依赖历史上下文)。
如果问题已经独立完整,直接返回原问题。"""),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
# 历史感知检索链
history_aware_retriever = (
contextualize_prompt
| llm
| StrOutputParser()
| retriever
)
# ── 步骤2:带历史的问答链 ──────────────────
qa_prompt = ChatPromptTemplate.from_messages([
("system", """你是专业技术助手。基于以下上下文回答问题。
上下文:{context}"""),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
qa_chain = (
RunnableParallel(
context=history_aware_retriever | format_docs,
input=RunnablePassthrough(),
chat_history=lambda x: x["chat_history"],
)
| qa_prompt
| llm
| StrOutputParser()
)
# ── 步骤3:注入记忆管理 ───────────────────
store = {} # 实际生产用 Redis 等持久化存储
def get_session_history(session_id: str) -> ChatMessageHistory:
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
conversational_rag = RunnableWithMessageHistory(
qa_chain,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="output",
)
# 多轮对话测试
config = {"configurable": {"session_id": "user_001"}}
r1 = conversational_rag.invoke(
{"input": "RAG 是什么?"}, config=config
)
print("Q1:", r1)
r2 = conversational_rag.invoke(
{"input": "它的主要优势有哪些?"}, # "它" 指代 RAG
config=config
)
print("Q2:", r2)
混合检索 LCEL 链
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
# BM25 检索器(来自原始文档列表)
bm25_retriever = BM25Retriever.from_documents(
documents,
k=20
)
# 向量检索器
vector_retriever = qdrant.as_retriever(search_kwargs={"k": 20})
# 混合检索:0.4 BM25 + 0.6 向量
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.4, 0.6],
)
# 加上 Reranker
cross_encoder = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")
reranker = CrossEncoderReranker(model=cross_encoder, top_n=5)
advanced_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=ensemble_retriever,
)
# 组装完整 Advanced RAG 链
advanced_rag = (
RunnableParallel(
context=advanced_retriever | format_docs,
question=RunnablePassthrough(),
)
| RAG_PROMPT
| llm
| StrOutputParser()
)
Redis 持久化对话记忆
# pip install langchain-redis redis
from langchain_redis import RedisChatMessageHistory
def get_redis_history(session_id: str) -> RedisChatMessageHistory:
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379",
ttl=3600 * 24 * 7, # 7天过期
key_prefix="rag_chat:",
)
# 记忆窗口管理(只保留最近 N 条消息)
from langchain_core.runnables import RunnableLambda
def trim_messages(messages, max_messages: int = 10):
"""只保留最近 10 条对话,避免 context 过长"""
if len(messages) > max_messages:
return messages[-max_messages:]
return messages
对话记忆管理策略
窗口记忆(Window Memory):只保留最近 N 轮,实现简单,适合大多数场景。摘要记忆(Summary Memory):用 LLM 压缩历史为摘要,适合超长对话。知识图谱记忆(KG Memory):将历史中的实体关系存为图,适合需要跨轮推理的复杂场景。
Self-Query 自然语言过滤
让 LLM 自动从用户问题中提取过滤条件,实现"只搜索 2024 年后的工程部文档"这样的自然语言检索。
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
# 描述文档的元数据结构
metadata_field_info = [
AttributeInfo(
name="source",
description="文档来源文件名",
type="string",
),
AttributeInfo(
name="department",
description="所属部门:engineering 或 research",
type="string",
),
AttributeInfo(
name="chapter",
description="章节编号,1-10",
type="integer",
),
]
self_query_retriever = SelfQueryRetriever.from_llm(
llm=llm,
vectorstore=qdrant,
document_contents="RAG 和向量数据库相关技术文档",
metadata_field_info=metadata_field_info,
verbose=True,
)
# 自动解析过滤条件
docs = self_query_retriever.invoke(
"找 engineering 部门第3章以后的内容"
)
# LLM 自动转换为:department == "engineering" AND chapter > 3
本章总结
- LCEL 用 | 管道符组合 Runnable,代码简洁,自动支持流式、批量、异步
- 对话 RAG 的关键:历史感知检索(将代词指代转化为独立查询)+ 记忆管理
- 混合检索链:BM25 + 向量 → EnsembleRetriever → CrossEncoderReranker
- Redis 持久化记忆 + 消息窗口控制是生产对话 RAG 的标配