LCEL 的设计哲学
LCEL(LangChain Expression Language) 是 LangChain 0.2.x 引入、0.3.x 全面推广的声明式管道 DSL。它的核心思想来自 Unix 管道哲学:将小的、单一职责的组件通过 | 操作符串联,每个组件的输出成为下一个组件的输入。
相比旧式 LLMChain,LCEL 解决了三个关键问题:
LCEL 优势
- 天然支持流式输出(不需要额外配置)
- 原生异步支持(astream / ainvoke / abatch)
- RunnableParallel 并行执行多个子链
- 完整的 LangSmith 追踪集成
- 简洁的声明式语法,易于阅读和维护
旧式 Chain 痛点
- 流式需要单独处理 callbacks
- 组合困难,需要大量嵌套配置
- 类继承体系复杂,调试困难
- 并行执行没有统一抽象
- 不同 Chain 的接口不一致
Runnable 接口:万物之基
LCEL 的基础是 Runnable 协议。任何实现了该协议的对象都可以用 | 串联。Runnable 提供以下标准方法:
| 操作符:串联 Runnable
LCEL 的 | 操作符本质上调用了 RunnableSequence——将左侧 Runnable 的输出传入右侧 Runnable 的输入。
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 定义三个 Runnable 组件
prompt = ChatPromptTemplate.from_template("用一段话解释:{topic}")
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
# 用 | 串联成链
chain = prompt | model | parser
# invoke:单次调用
result = chain.invoke({"topic": "量子纠缠"})
print(result)
# stream:流式输出
for chunk in chain.stream({"topic": "黑洞"}):
print(chunk, end="", flush=True)
# batch:批量调用
results = chain.batch([
{"topic": "相对论"},
{"topic": "量子力学"},
{"topic": "弦理论"},
], config={"max_concurrency": 3}) # 最大并发数
链中每一步的输入/输出类型需要匹配:
dict → ChatPromptTemplate → list[BaseMessage] → ChatModel → AIMessage → StrOutputParser → str
如果类型不匹配(比如把字符串传给期望 dict 的 PromptTemplate),运行时会抛出 TypeError。使用 RunnableLambda 可以做类型转换。
RunnableParallel:并行执行
RunnableParallel 将多个 Runnable 并行执行,每个接收相同的输入,结果组合成一个字典输出。这在 RAG 系统中非常有用(同时检索多个来源)。
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
# 示例:同时生成翻译和摘要
translate_prompt = ChatPromptTemplate.from_template("将以下文本翻译成英文:{text}")
summary_prompt = ChatPromptTemplate.from_template("用一句话总结以下文本:{text}")
parallel_chain = RunnableParallel(
translation=(translate_prompt | model | parser),
summary=(summary_prompt | model | parser),
original=RunnablePassthrough(), # 直接透传原始输入
)
result = parallel_chain.invoke({"text": "大语言模型改变了软件开发的方式。"})
print(result["translation"]) # 英文翻译
print(result["summary"]) # 中文摘要
print(result["original"]) # {"text": "大语言模型..."}
使用字典语法(等价简写)
# 字典字面量在 LCEL 中自动转换为 RunnableParallel
chain = {
"context": retriever, # 检索器
"question": RunnablePassthrough(), # 透传问题
} | rag_prompt | model | parser
# 等价于:
# RunnableParallel(context=retriever, question=RunnablePassthrough())
# | rag_prompt | model | parser
RunnablePassthrough 与 RunnableLambda
这两个工具类是 LCEL 管道的"粘合剂":
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
# RunnablePassthrough:透传输入,不做任何处理
# 常用于在 RunnableParallel 中保留某个字段
# RunnableLambda:将普通函数包装为 Runnable
def uppercase(text: str) -> str:
return text.upper()
chain = prompt | model | parser | RunnableLambda(uppercase)
# 或者使用 lambda 语法
chain = prompt | model | parser | RunnableLambda(lambda x: x.upper())
# assign():在字典中添加新字段(不破坏原有字段)
chain = RunnableParallel(question=RunnablePassthrough()).assign(
answer=(prompt | model | parser)
)
result = chain.invoke({"question": "什么是 RAG?"})
# {"question": "什么是 RAG?", "answer": "RAG 是..."}
RunnablePassthrough.assign():构建 RAG 数据流
在 RAG 系统中,assign() 方法非常实用——它允许在管道的字典中逐步添加新字段,同时保留已有字段:
# 经典 RAG 链构建模式
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
rag_prompt = ChatPromptTemplate.from_template("""
根据以下上下文回答问题:
上下文:
{context}
问题:{question}
请基于上下文中的信息回答,如果上下文中没有相关信息,请说"我不知道"。
""")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{
"context": retriever | RunnableLambda(format_docs),
"question": RunnablePassthrough(),
}
| rag_prompt
| model
| parser
)
answer = rag_chain.invoke("LangChain 是什么?")
流式输出的工作原理
LCEL 链的流式能力是自动传播的:只要链中有一个 Runnable 支持流式(如 ChatModel),整个链就会以流式方式运行。
# 同步流式
for chunk in chain.stream({"topic": "机器学习"}):
print(chunk, end="", flush=True)
print()
# 异步流式(在 FastAPI 中使用)
import asyncio
async def stream_response():
async for chunk in chain.astream({"topic": "深度学习"}):
print(chunk, end="", flush=True)
asyncio.run(stream_response())
# stream_events():获取细粒度事件流(适合前端展示)
async for event in chain.astream_events({"topic": "NLP"}, version="v2"):
if event["event"] == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)
RunnableConfig:运行时配置
每个 Runnable 调用都可以传入 config 参数,用于覆盖运行时行为:
# 常用配置项
chain.invoke(
{"topic": "Python"},
config={
"run_name": "explain-python", # LangSmith 中显示的名称
"tags": ["production", "v2"], # 追踪标签
"metadata": {"user_id": "u123"}, # 追踪元数据
"max_concurrency": 5, # batch 最大并发数
"callbacks": [], # 自定义回调
}
)
bind():固定 Runnable 参数
bind() 方法允许在不修改原始 Runnable 的情况下,预先绑定部分参数:
# 为特定链绑定更严格的 temperature
precise_model = model.bind(temperature=0, max_tokens=200)
# 绑定工具到模型(第7章的 Tool Calling)
model_with_tools = model.bind_tools([search_tool, calculator_tool])
# 绑定停止词
model_with_stop = model.bind(stop=["END", "---"])
# 在链中使用绑定后的模型
precise_chain = prompt | precise_model | parser
a | b | c 实际上创建了一个 RunnableSequence(first=a, middle=[b], last=c) 对象。你也可以显式创建:RunnableSequence(first=prompt, middle=[model], last=parser)。两种方式完全等价,推荐使用 | 语法,更简洁直观。
本章小结
- LCEL 是 LangChain 0.3.x 的核心:基于 Runnable 接口,用
|操作符声明式地串联组件 - 所有 Runnable 统一支持
invoke / stream / batch / ainvoke / astream / abatch RunnableParallel/ 字典字面量实现并行执行多个子链RunnablePassthrough透传输入,RunnableLambda将任意函数包装为 Runnable- 流式输出在 LCEL 链中自动传播,无需额外配置
bind()预绑定参数,.assign()在字典中增量添加字段