Chapter 04

SDK 接入:Python / TS / OpenTelemetry 三路

Langfuse 给了三套"把 trace 打进去"的路子:高层装饰器、框架自动 instrument、OpenTelemetry 原生。选哪种,看你的代码长什么样。

三条路子总览

路子代码侵入灵活度适合
A. 低层 SDK(trace/generation/span)手工埋点★★★★★自写 LLM 调用、完全掌控 span 形状
B. @observe 装饰器加一行装饰器★★★★Python 函数边界清晰、结构化代码
C. 框架自动 instrument一行 import★★★已经用 LangChain / LlamaIndex / OpenAI SDK
D. OpenTelemetry OTLP配 OTel SDK★★★★已有 OTel 基础设施,想共用 pipeline

不同路子可以 同时存在——同一个项目里,主流程用 @observe,某个慢函数临时加一段低层 span 细查细节,完全 OK。

A 路:低层 SDK

最原始、最灵活,其他方式本质都是对它的包装。Python:

from langfuse import Langfuse

lf = Langfuse()

trace = lf.trace(
    name="customer-chat",
    user_id="u_42",
    session_id="s_abc",
    metadata={"channel": "whatsapp"},
    tags=["prod", "v2"],
)

# 1) retrieval span
retr = trace.span(name="retrieval", input={"query": query})
docs = search_vector_db(query)
retr.end(output={"docs": docs})

# 2) LLM generation (关键 span 类型,单独记 token + cost)
gen = trace.generation(
    name="answer-llm",
    model="gpt-4o",
    input=[{"role": "user", "content": query}],
)
rsp = openai.chat.completions.create(...)
gen.end(
    output=rsp.choices[0].message.content,
    usage={
        "input": rsp.usage.prompt_tokens,
        "output": rsp.usage.completion_tokens,
    },
)

trace.update(output={"final": rsp.choices[0].message.content})
lf.flush()

三种 span 类型要分清:

generation
专门代表一次 LLM 调用。必须带 model,支持 usage(input/output token)。Langfuse 会据此自动算 cost。
span
通用耗时段,不是 LLM 的都用它:向量检索、DB 查询、外部 API 调用、自己写的任意逻辑。
event
瞬时事件,没时长概念。适合记"用户点了按钮"、"命中了 guardrail"这种点事件。

B 路:@observe 装饰器(Python)

如果你代码是函数式组织的,这招最省事:

from langfuse.decorators import observe, langfuse_context

@observe(name="retrieval")
def retrieve(query: str) -> list:
    return search_vector_db(query)

@observe(as_type="generation", name="answer")
def answer(query: str, docs: list) -> str:
    rsp = openai.chat.completions.create(
        model="gpt-4o",
        messages=build_messages(query, docs),
    )
    langfuse_context.update_current_observation(
        model="gpt-4o",
        usage={
            "input": rsp.usage.prompt_tokens,
            "output": rsp.usage.completion_tokens,
        },
    )
    return rsp.choices[0].message.content

@observe(name="chat-turn")
def chat(query: str, user_id: str) -> str:
    langfuse_context.update_current_trace(user_id=user_id, tags=["prod"])
    docs = retrieve(query)
    return answer(query, docs)

装饰器自动:

async 函数也支持,直接 @observe async def ... 即可。

哪些函数适合装饰
有明确"输入 → 输出"语义的函数最合适:一次 retrieval、一次 LLM 调用、一个工具函数、一个 agent step。纯数据转换的小辅助函数别装,会把 trace 搞得很吵。

C 路:框架自动 instrument

C1. OpenAI SDK 零改动

from langfuse.openai import openai

# 后面的 openai.chat.completions.create / openai.embeddings.create
# 自动被 trace, 所有调用都出现在 Langfuse, 附带 model + token + cost

C2. LangChain

from langfuse.callback import CallbackHandler

handler = CallbackHandler(session_id="s_abc", user_id="u_42")

chain = prompt | llm | parser
rsp = chain.invoke({"q": "hello"}, config={"callbacks": [handler]})

LangChain 的每个 LCEL 节点都会变成一个 span,嵌套关系完整保留。LangGraph 也是走这个 callback。

C3. LlamaIndex

from llama_index.core import Settings
from langfuse.llama_index import LlamaIndexInstrumentor

LlamaIndexInstrumentor().start()

# 后面 query/retrieve/chat 都自动 trace
index.as_query_engine().query("...")

C4. Anthropic / 其他

Langfuse 官方 SDK 原生包裹 OpenAI;Anthropic / Gemini / Mistral 等通过 traceloop / OpenInference 这些 OTel instrumentation,走 D 路(OTel)最顺——见下节。

D 路:OpenTelemetry OTLP

如果你公司已经在跑 OTel Collector,Langfuse 原生收 OTLP。配置思路:

import base64, os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

# Langfuse 收 OTLP 的端点
LANGFUSE_HOST = "http://localhost:3000"
auth = base64.b64encode(f"{os.environ['LANGFUSE_PUBLIC_KEY']}:{os.environ['LANGFUSE_SECRET_KEY']}".encode()).decode()

provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(
    OTLPSpanExporter(
        endpoint=f"{LANGFUSE_HOST}/api/public/otel/v1/traces",
        headers={"Authorization": f"Basic {auth}"},
    )
))
trace.set_tracer_provider(provider)

# 之后任意 OTel instrumentation 的 span 都能到 Langfuse
from openinference.instrumentation.openai import OpenAIInstrumentor
OpenAIInstrumentor().instrument()

这条路最大的收益:

TypeScript 接入

import { Langfuse } from "langfuse";

const lf = new Langfuse({
  publicKey: process.env.LANGFUSE_PUBLIC_KEY!,
  secretKey: process.env.LANGFUSE_SECRET_KEY!,
  baseUrl:   process.env.LANGFUSE_HOST,
});

const trace = lf.trace({ name: "chat", userId: "u_42" });

const gen = trace.generation({
  name: "answer",
  model: "gpt-4o",
  input: messages,
});

const rsp = await openai.chat.completions.create({ ... });

gen.end({
  output: rsp.choices[0].message.content,
  usage: { input: rsp.usage!.prompt_tokens, output: rsp.usage!.completion_tokens },
});

await lf.flushAsync();

TS 里另一个常用切点是 Vercel AI SDK:

import { streamText } from "ai";
import { LangfuseExporter } from "langfuse-vercel";

// 走 OTel:Vercel AI SDK 自带 experimental_telemetry
const result = await streamText({
  model: openai("gpt-4o"),
  prompt: "...",
  experimental_telemetry: { isEnabled: true },
});

把元数据打全

接完 SDK 只是第一步,真正有用是 metadata 打全。至少这几个字段不要漏:

不要把 PII 当成 metadata
手机号、身份证号、完整邮箱别往 metadata 里塞。需要的话在 SDK 层写个 hook 把 input/output 先脱敏再上报。Langfuse 的 mask 钩子就是给这个用的。

本章小结