Chapter 03

LCEL 链式管道

掌握 LangChain 0.3.x 的核心:Runnable 接口、| 操作符组合、并行执行与流式输出

LCEL 的设计哲学

LCEL(LangChain Expression Language) 是 LangChain 0.2.x 引入、0.3.x 全面推广的声明式管道 DSL。它的核心思想来自 Unix 管道哲学:将小的、单一职责的组件通过 | 操作符串联,每个组件的输出成为下一个组件的输入。

相比旧式 LLMChain,LCEL 解决了三个关键问题:

LCEL 优势

  • 天然支持流式输出(不需要额外配置)
  • 原生异步支持(astream / ainvoke / abatch)
  • RunnableParallel 并行执行多个子链
  • 完整的 LangSmith 追踪集成
  • 简洁的声明式语法,易于阅读和维护

旧式 Chain 痛点

  • 流式需要单独处理 callbacks
  • 组合困难,需要大量嵌套配置
  • 类继承体系复杂,调试困难
  • 并行执行没有统一抽象
  • 不同 Chain 的接口不一致

Runnable 接口:万物之基

LCEL 的基础是 Runnable 协议。任何实现了该协议的对象都可以用 | 串联。Runnable 提供以下标准方法:

invoke(input)
同步单次调用,返回完整输出。适合简单脚本和测试。
stream(input)
同步流式迭代器,逐步返回输出 chunk。适合终端打印进度。
batch(inputs)
批量并发调用,传入列表,返回结果列表。内置并发控制(max_concurrency 参数)。
ainvoke / astream / abatch
以上三个方法的异步版本,在 async/await 环境中使用(FastAPI、Jupyter 等)。

| 操作符:串联 Runnable

LCEL 的 | 操作符本质上调用了 RunnableSequence——将左侧 Runnable 的输出传入右侧 Runnable 的输入。

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 定义三个 Runnable 组件
prompt = ChatPromptTemplate.from_template("用一段话解释:{topic}")
model  = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()

# 用 | 串联成链
chain = prompt | model | parser

# invoke:单次调用
result = chain.invoke({"topic": "量子纠缠"})
print(result)

# stream:流式输出
for chunk in chain.stream({"topic": "黑洞"}):
    print(chunk, end="", flush=True)

# batch:批量调用
results = chain.batch([
    {"topic": "相对论"},
    {"topic": "量子力学"},
    {"topic": "弦理论"},
], config={"max_concurrency": 3})  # 最大并发数
数据流动的类型

链中每一步的输入/输出类型需要匹配:
dict → ChatPromptTemplate → list[BaseMessage] → ChatModel → AIMessage → StrOutputParser → str
如果类型不匹配(比如把字符串传给期望 dict 的 PromptTemplate),运行时会抛出 TypeError。使用 RunnableLambda 可以做类型转换。

RunnableParallel:并行执行

RunnableParallel 将多个 Runnable 并行执行,每个接收相同的输入,结果组合成一个字典输出。这在 RAG 系统中非常有用(同时检索多个来源)。

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 示例:同时生成翻译和摘要
translate_prompt = ChatPromptTemplate.from_template("将以下文本翻译成英文:{text}")
summary_prompt   = ChatPromptTemplate.from_template("用一句话总结以下文本:{text}")

parallel_chain = RunnableParallel(
    translation=(translate_prompt | model | parser),
    summary=(summary_prompt | model | parser),
    original=RunnablePassthrough(),  # 直接透传原始输入
)

result = parallel_chain.invoke({"text": "大语言模型改变了软件开发的方式。"})
print(result["translation"])  # 英文翻译
print(result["summary"])      # 中文摘要
print(result["original"])    # {"text": "大语言模型..."}

使用字典语法(等价简写)

# 字典字面量在 LCEL 中自动转换为 RunnableParallel
chain = {
    "context": retriever,              # 检索器
    "question": RunnablePassthrough(), # 透传问题
} | rag_prompt | model | parser

# 等价于:
# RunnableParallel(context=retriever, question=RunnablePassthrough())
# | rag_prompt | model | parser

RunnablePassthrough 与 RunnableLambda

这两个工具类是 LCEL 管道的"粘合剂":

from langchain_core.runnables import RunnableLambda, RunnablePassthrough

# RunnablePassthrough:透传输入,不做任何处理
# 常用于在 RunnableParallel 中保留某个字段

# RunnableLambda:将普通函数包装为 Runnable
def uppercase(text: str) -> str:
    return text.upper()

chain = prompt | model | parser | RunnableLambda(uppercase)

# 或者使用 lambda 语法
chain = prompt | model | parser | RunnableLambda(lambda x: x.upper())

# assign():在字典中添加新字段(不破坏原有字段)
chain = RunnableParallel(question=RunnablePassthrough()).assign(
    answer=(prompt | model | parser)
)
result = chain.invoke({"question": "什么是 RAG?"})
# {"question": "什么是 RAG?", "answer": "RAG 是..."}

RunnablePassthrough.assign():构建 RAG 数据流

在 RAG 系统中,assign() 方法非常实用——它允许在管道的字典中逐步添加新字段,同时保留已有字段:

# 经典 RAG 链构建模式
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

rag_prompt = ChatPromptTemplate.from_template("""
根据以下上下文回答问题:

上下文:
{context}

问题:{question}

请基于上下文中的信息回答,如果上下文中没有相关信息,请说"我不知道"。
""")

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {
        "context": retriever | RunnableLambda(format_docs),
        "question": RunnablePassthrough(),
    }
    | rag_prompt
    | model
    | parser
)

answer = rag_chain.invoke("LangChain 是什么?")

流式输出的工作原理

LCEL 链的流式能力是自动传播的:只要链中有一个 Runnable 支持流式(如 ChatModel),整个链就会以流式方式运行。

# 同步流式
for chunk in chain.stream({"topic": "机器学习"}):
    print(chunk, end="", flush=True)
print()

# 异步流式(在 FastAPI 中使用)
import asyncio

async def stream_response():
    async for chunk in chain.astream({"topic": "深度学习"}):
        print(chunk, end="", flush=True)

asyncio.run(stream_response())

# stream_events():获取细粒度事件流(适合前端展示)
async for event in chain.astream_events({"topic": "NLP"}, version="v2"):
    if event["event"] == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            print(content, end="", flush=True)

RunnableConfig:运行时配置

每个 Runnable 调用都可以传入 config 参数,用于覆盖运行时行为:

# 常用配置项
chain.invoke(
    {"topic": "Python"},
    config={
        "run_name": "explain-python",   # LangSmith 中显示的名称
        "tags": ["production", "v2"],    # 追踪标签
        "metadata": {"user_id": "u123"}, # 追踪元数据
        "max_concurrency": 5,            # batch 最大并发数
        "callbacks": [],                  # 自定义回调
    }
)

bind():固定 Runnable 参数

bind() 方法允许在不修改原始 Runnable 的情况下,预先绑定部分参数:

# 为特定链绑定更严格的 temperature
precise_model = model.bind(temperature=0, max_tokens=200)

# 绑定工具到模型(第7章的 Tool Calling)
model_with_tools = model.bind_tools([search_tool, calculator_tool])

# 绑定停止词
model_with_stop = model.bind(stop=["END", "---"])

# 在链中使用绑定后的模型
precise_chain = prompt | precise_model | parser
LCEL 与 RunnableSequence 的关系

a | b | c 实际上创建了一个 RunnableSequence(first=a, middle=[b], last=c) 对象。你也可以显式创建:RunnableSequence(first=prompt, middle=[model], last=parser)。两种方式完全等价,推荐使用 | 语法,更简洁直观。

本章小结