任何运行在生产环境的 LangChain 应用,尤其是涉及 Agent 或复杂 Chain 的系统,很快就会变成一个难以理解的黑盒。当用户抱怨响应缓慢时,瓶颈是 LLM 调用、工具执行还是数据解析?当运营部门质询成本时,哪个用户的哪类请求消耗了最多的 gpt-4-turbo
tokens?传统的 APM 工具在这种新兴的、以 LLM 为核心的架构面前显得力不从心,它们为 HTTP 请求和数据库查询而生,却无法理解“Agent 思考过程”、“Token 消耗”或“工具调用元数据”这些核心概念。
问题的本质在于,LLM 应用的可观测性需要一个定制化的解决方案,一个能同时处理结构化事件流和高基数时序指标的系统。我们面临的挑战是,在不显著增加应用延迟的前提下,捕获、存储并可视化每一次 Agent 执行的完整生命周期。
# /callbacks/instrumentation.py
import os
import time
from uuid import UUID
from typing import Any, Dict, List, Union
from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_core.outputs import AgentAction, AgentFinish, LLMResult
from .metrics_client import InfluxMetricsClient
from .trace_store import FirestoreTraceStore
class DualCoreObservabilityHandler(BaseCallbackHandler):
"""
一个将追踪数据写入 Firestore 并将时序指标发送到 InfluxDB 的回调处理器。
这是我们可观测性系统的核心数据收集器。
"""
def __init__(self, trace_id: str, user_id: str):
# 确保每个请求链路都有唯一标识
self.trace_id = trace_id
self.user_id = user_id
# 初始化双核存储客户端
self.trace_store = FirestoreTraceStore(collection_name="llm_traces")
self.metrics_client = InfluxMetricsClient(
url=os.environ.get("INFLUXDB_URL"),
token=os.environ.get("INFLUXDB_TOKEN"),
org=os.environ.get("INFLUXDB_ORG"),
bucket="llm_metrics"
)
self.start_time = time.time()
self._steps = []
self._current_step = {}
def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> None:
"""记录整个调用链的开始。"""
self.start_time = time.time()
initial_payload = {
"trace_id": self.trace_id,
"user_id": self.user_id,
"start_time": self.start_time,
"status": "running",
"inputs": inputs,
"steps": [],
}
# 在 Firestore 中创建追踪文档的骨架
self.trace_store.create_trace(self.trace_id, initial_payload)
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""在 LLM 调用开始时,记录当前步骤的开始时间。"""
self._current_step = {
"type": "llm",
"start_time": time.time(),
"prompt": "\n---\n".join(prompts),
}
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""在 LLM 调用结束时,计算指标并记录详细信息。"""
end_time = time.time()
latency = (end_time - self._current_step["start_time"]) * 1000 # ms
# 提取关键的 token 和模型信息
usage = response.llm_output.get("token_usage", {})
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
model_name = response.llm_output.get("model_name", "unknown")
# 记录追踪数据
self._current_step.update({
"end_time": end_time,
"latency_ms": latency,
"model_name": model_name,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"response": response.generations[0][0].text,
})
self._steps.append(self._current_step)
# 发送指标数据到 InfluxDB
tags = {"user_id": self.user_id, "model_name": model_name, "step_type": "llm"}
fields = {
"latency_ms": float(latency),
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
}
self.metrics_client.write_point("llm_performance", tags, fields)
self._current_step = {}
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> None:
"""记录工具调用的开始。"""
self._current_step = {
"type": "tool",
"start_time": time.time(),
"tool_name": serialized.get("name"),
"tool_input": input_str,
}
def on_tool_end(
self, output: str, **kwargs: Any
) -> None:
"""记录工具调用的结束和性能。"""
end_time = time.time()
latency = (end_time - self._current_step["start_time"]) * 1000
self._current_step.update({
"end_time": end_time,
"latency_ms": latency,
"tool_output": output,
})
self._steps.append(self._current_step)
tags = {"user_id": self.user_id, "tool_name": self._current_step["tool_name"]}
fields = {"latency_ms": float(latency)}
self.metrics_client.write_point("tool_performance", tags, fields)
self._current_step = {}
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""在整个链结束时,更新最终状态和追踪文档。"""
total_latency = (time.time() - self.start_time) * 1000
final_payload = {
"status": "completed",
"end_time": time.time(),
"total_latency_ms": total_latency,
"outputs": outputs,
"steps": self._steps,
}
self.trace_store.update_trace(self.trace_id, final_payload)
tags = {"user_id": self.user_id}
fields = {"total_latency_ms": float(total_latency)}
self.metrics_client.write_point("chain_performance", tags, fields)
# 同样可以实现 on_agent_action, on_agent_finish 等来捕获更丰富的信息...
定义复杂问题:LLM 应用的可观测性维度
一个通用的 APM 方案,例如将 OpenTelemetry 集成到应用中,再搭配 Prometheus 和 Jaeger,是许多微服务架构的标准实践。然而,在 LangChain 应用场景下,这种方案存在几个根本性的不匹配:
- 数据模型的错位:传统 APM 的核心是 Span,它为请求而设计,包含
service_name
、http_method
等标签。但我们真正关心的是model_name
、prompt_tokens
、tool_name
和agent_thought
。将这些 LLM 特有的元数据硬塞进通用标签中,会使查询和分析变得异常笨拙和低效。 - 高基数维度的挑战:我们希望按
user_id
、session_id
甚至request_id
来分析成本和性能。在用户量大的系统中,这会导致 Prometheus 的时序数据基数爆炸,严重影响其性能和存储效率。 - 非结构化数据的存储:Agent 的思考过程、完整的 prompt 和 response,这些是富文本、半结构化的数据。将它们存储在 Jaeger 的日志或标签中,不仅查询困难,而且不适合做深度分析。它更像是一份日志,而非一份可供分析的结构化文档。
因此,直接套用传统方案,最终会得到一个既昂贵又难用的系统。我们需要的是一个为 LLM 应用量身定制的架构。
方案 A:基于 OpenTelemetry 的扩展方案
这个方案的思路是扩展现有的标准,而非完全自研。
优点:
- 遵循行业标准,生态成熟,可以与现有的基础设施(如服务网格、网关)无缝集成。
- 有大量的现成 Collector 和后端存储(Jaeger, Zipkin, Prometheus)可供选择。
- 社区活跃,客户端 SDK 语言支持广泛。
缺点与实现难点:
- 语义约定: 需要定义一套全新的 OpenTelemetry Semantic Conventions for LLM。例如,
llm.model.name
、llm.usage.prompt_tokens
等。这需要团队内部达成共识并严格遵守,否则数据将混乱不堪。 - 后端查询复杂性: 在 Grafana 中,你需要编写极其复杂的 PromQL 或 LogQL 查询,通过多个标签
JOIN
来关联一个请求中的 LLM 调用和工具调用,这几乎是不现实的。 - 存储成本: 为了保存完整的 prompt/response,你可能会将它们作为 Span 的日志或属性。这会导致后端存储(如 Elasticsearch 或 Jaeger with Cassandra)的索引和存储成本急剧上升。
- 语义约定: 需要定义一套全新的 OpenTelemetry Semantic Conventions for LLM。例如,
这个方案看似“标准”,但在实践中,为了适应 LLM 的特殊性,你需要进行大量的定制和变通,最终可能违背了使用标准的初衷。
方案 B:Firestore 与 InfluxDB 的双核存储方案
这个方案的核心是承认不同类型数据的存储需求是不同的,并为它们选择最合适的工具。
flowchart TD subgraph App Host A[LangChain Agent] -- "triggers" --> B(DualCoreObservabilityHandler); end subgraph Observability Pipeline B -- "Trace Document (JSON)" --> C[Firestore]; B -- "Metrics Points (Line Protocol)" --> D[InfluxDB]; end subgraph Visualization Layer E[UnoCSS Dashboard]-- "Queries for traces" --> C; E -- "Queries for metrics (Flux)" --> D; end Dev[Developer] --> E;
架构选择理由:
- Firestore for Traces: Agent 的执行过程本质上是一个事件流,最终构成一个结构复杂、可能深度嵌套的文档。Firestore 的文档模型是存储这类数据的理想选择。它的 schemaless 特性允许我们轻松地增加新的字段,而无需进行数据库迁移。实时更新能力也为构建一个能直播 Agent 执行过程的调试工具提供了可能。
- InfluxDB for Metrics: 延迟、Token 数量、调用次数等都是典型的时序数据。InfluxDB 专为此类数据设计,具备极高的数据写入吞吐量和查询性能,其内置的 Flux 查询语言和时间窗口函数能非常高效地进行聚合分析,例如计算“过去一小时 P99 延迟”或“每个用户昨日的 Token 总消耗”。
- UnoCSS for Dashboard: 我们需要一个高度定制化的前端来同时展示来自两个数据源的信息。一个典型的视图是:上方是来自 InfluxDB 的性能图表,下方是来自 Firestore 的详细调用追踪。使用 UnoCSS 这种原子化 CSS 框架,可以极快地构建出轻量、高性能、数据密集型的 UI,而无需引入重型前端框架的复杂性和构建开销。
潜在问题:
- 数据一致性: 写入是分离的,可能出现 Firestore 写入成功但 InfluxDB 写入失败的情况。这会导致有追踪但无指标。在实践中,可以通过在回调处理器中增加重试和死信队列逻辑来缓解,但无法做到原子性保证。对于可观测性系统,这种偶发的不一致通常是可以接受的。
- 开发成本: 相比使用现成的 Grafana,构建一个自定义前端需要额外的工作。但考虑到所能获得的定制化体验和查询效率,这种投入是值得的。
最终,我们选择方案 B。它通过专业分工,让每个组件都做自己最擅长的事,从而构建一个在性能、成本和功能上都远超通用方案的专用可观测性平台。
核心实现概览
1. 后端数据存储客户端
除了上面的回调处理器,我们还需要两个健壮的客户端与数据库交互。
# /callbacks/metrics_client.py
import logging
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
class InfluxMetricsClient:
"""一个简单的 InfluxDB 客户端封装,用于写入性能指标。"""
def __init__(self, url: str, token: str, org: str, bucket: str):
try:
self._client = InfluxDBClient(url=url, token=token, org=org)
self._write_api = self._client.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.org = org
except Exception as e:
logging.error(f"Failed to connect to InfluxDB: {e}")
self._client = None
self._write_api = None
def write_point(self, measurement: str, tags: Dict[str, str], fields: Dict[str, Any]):
"""
写入一个数据点。
在生产环境中,这里应该改为异步写入或批量写入,以减少对主应用的影响。
"""
if not self._write_api:
logging.warning("InfluxDB client not initialized. Skipping metric write.")
return
point = Point(measurement)
for tag_key, tag_value in tags.items():
point.tag(tag_key, tag_value)
for field_key, field_value in fields.items():
point.field(field_key, field_value)
try:
self._write_api.write(bucket=self.bucket, org=self.org, record=point)
except Exception as e:
# 关键:必须捕获异常,防止可观测性系统的故障影响主业务逻辑。
logging.error(f"Failed to write point to InfluxDB: {e}")
def close(self):
if self._client:
self._client.close()
# /callbacks/trace_store.py
import logging
import firebase_admin
from firebase_admin import credentials, firestore
class FirestoreTraceStore:
"""封装与 Firestore 的交互,用于存储和更新追踪文档。"""
_db = None
def __init__(self, collection_name: str):
if not firebase_admin._apps:
# 在真实项目中,SDK 初始化应该在应用启动时完成,这里仅为示例。
try:
cred = credentials.ApplicationDefault()
firebase_admin.initialize_app(cred)
except Exception as e:
logging.error(f"Failed to initialize Firebase Admin SDK: {e}")
if firebase_admin._apps:
self._db = firestore.client()
self.collection_ref = self._db.collection(collection_name)
else:
self._db = None
self.collection_ref = None
def create_trace(self, trace_id: str, payload: Dict[str, Any]):
if not self.collection_ref:
logging.warning("Firestore client not initialized. Skipping trace creation.")
return
try:
self.collection_ref.document(trace_id).set(payload)
except Exception as e:
logging.error(f"Failed to create trace in Firestore: {e}")
def update_trace(self, trace_id: str, payload: Dict[str, Any]):
if not self.collection_ref:
logging.warning("Firestore client not initialized. Skipping trace update.")
return
try:
# 使用 merge=True 来更新文档,而不是覆盖。
self.collection_ref.document(trace_id).set(payload, merge=True)
except Exception as e:
logging.error(f"Failed to update trace in Firestore: {e}")
2. 前端可视化仪表盘 (UnoCSS)
这是一个独立的 HTML 文件,展示了如何用 UnoCSS 快速构建一个数据展示界面。它不依赖任何构建工具,直接通过 CDN 使用,非常适合快速原型或内部工具。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>LLM Observability Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/@unocss/runtime"></script>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
/* UnoCSS 运行时需要一个占位符 */
:root { --un-default-border-color: #e5e7eb; }
.un-bg-gray-800 { --un-bg-opacity: 1; background-color: rgba(31, 41, 55, var(--un-bg-opacity)); }
.dark .un-bg-gray-800 { --un-bg-opacity: 1; background-color: rgba(31, 41, 55, var(--un-bg-opacity)); }
</style>
</head>
<body class="bg-gray-100 dark:bg-gray-900 text-gray-800 dark:text-gray-200 font-sans">
<div class="h-screen w-screen flex">
<!-- Sidebar for Traces -->
<aside class="w-96 bg-white dark:bg-gray-800 p-4 border-r border-gray-200 dark:border-gray-700 overflow-y-auto">
<h2 class="text-xl font-bold mb-4">Recent Traces</h2>
<div id="trace-list" class="space-y-2">
<!-- Trace items will be injected here -->
</div>
</aside>
<!-- Main Content -->
<main class="flex-1 p-6 flex flex-col">
<header class="mb-6">
<h1 class="text-3xl font-bold">Trace Details</h1>
<p id="selected-trace-id" class="text-gray-500">Select a trace to see details</p>
</header>
<!-- Metrics Charts from InfluxDB -->
<section class="grid grid-cols-1 md:grid-cols-3 gap-4 mb-6">
<div class="bg-white dark:bg-gray-800 p-4 rounded-lg shadow">
<h3 class="font-semibold mb-2">Total Latency (ms)</h3>
<canvas id="latencyChart"></canvas>
</div>
<div class="bg-white dark:bg-gray-800 p-4 rounded-lg shadow">
<h3 class="font-semibold mb-2">Token Usage</h3>
<canvas id="tokenChart"></canvas>
</div>
<div class="bg-white dark:bg-gray-800 p-4 rounded-lg shadow">
<h3 class="font-semibold mb-2">Cost (USD)</h3>
<p class="text-3xl font-bold text-green-500">$0.1234</p>
</div>
</section>
<!-- Trace Steps from Firestore -->
<section class="flex-1 bg-white dark:bg-gray-800 p-4 rounded-lg shadow overflow-y-auto">
<h3 class="font-semibold mb-4">Execution Steps</h3>
<div id="steps-container" class="font-mono text-sm space-y-4">
<!-- Step details will be injected here -->
</div>
</section>
</main>
</div>
<script>
// In a real app, this data would come from API calls to your backend,
// which in turn queries Firestore and InfluxDB.
const mockTraces = [
{ id: 'trace-123', status: 'completed', total_latency_ms: 1250.5, user_id: 'user-A' },
{ id: 'trace-456', status: 'failed', total_latency_ms: 830.2, user_id: 'user-B' },
{ id: 'trace-789', status: 'completed', total_latency_ms: 3410.0, user_id: 'user-A' },
];
const mockTraceDetails = {
'trace-123': {
steps: [
{ type: 'llm', latency_ms: 800, model_name: 'gpt-4', prompt_tokens: 500, completion_tokens: 150, response: 'Thought: I should use the search tool.' },
{ type: 'tool', latency_ms: 400, tool_name: 'search', tool_input: 'latest AI news', tool_output: '[...]' }
]
}
};
function renderTraceList() {
const listEl = document.getElementById('trace-list');
listEl.innerHTML = mockTraces.map(trace => `
<div class="p-3 rounded-md cursor-pointer hover:bg-gray-100 dark:hover:bg-gray-700 border border-gray-200 dark:border-gray-600"
onclick="selectTrace('${trace.id}')">
<p class="font-semibold truncate">${trace.id}</p>
<div class="flex justify-between text-xs text-gray-500 dark:text-gray-400">
<span>${trace.status === 'completed' ? '✅' : '❌'} ${trace.total_latency_ms.toFixed(0)}ms</span>
<span>${trace.user_id}</span>
</div>
</div>
`).join('');
}
function selectTrace(traceId) {
document.getElementById('selected-trace-id').textContent = `Trace ID: ${traceId}`;
const details = mockTraceDetails[traceId];
const container = document.getElementById('steps-container');
if (details) {
container.innerHTML = details.steps.map(step => {
if (step.type === 'llm') {
return `<div class="p-3 bg-blue-50 dark:bg-blue-900/50 border-l-4 border-blue-500 rounded">
<div class="flex justify-between items-center font-bold">
<span>LLM Call (${step.model_name})</span>
<span class="text-blue-500">${step.latency_ms.toFixed(0)}ms</span>
</div>
<div class="text-xs text-gray-500 mt-1">
Tokens: ${step.prompt_tokens} (p) + ${step.completion_tokens} (c)
</div>
<pre class="mt-2 p-2 bg-gray-100 dark:bg-gray-700 rounded text-xs whitespace-pre-wrap">${step.response}</pre>
</div>`;
}
// ... render tool steps similarly
return '';
}).join('');
}
}
// Initialize charts (mock data)
new Chart(document.getElementById('latencyChart'), { type: 'line', data: { labels: ['-5m', '-4m', '-3m', '-2m', '-1m'], datasets: [{ label: 'P95 Latency', data: [1100, 1200, 1050, 1300, 1250], tension: 0.1 }] } });
new Chart(document.getElementById('tokenChart'), { type: 'bar', data: { labels: ['Prompt', 'Completion'], datasets: [{ label: 'Tokens', data: [45000, 12000] }] } });
renderTraceList();
</script>
</body>
</html>
这个 HTML 文件完全自包含。你只需用浏览器打开它,就能看到一个由 UnoCSS 驱动的界面。它通过 class
属性直接应用样式,如 flex
, p-4
, rounded-lg
, dark:bg-gray-800
,实现了无需编写任何 CSS 文件的开发体验,同时保持了极高的性能和可维护性。
架构的扩展性与局限性
此双核架构为后续的扩展提供了坚实的基础。例如,可以在 on_chain_end
回调中增加一个 cost
字段,通过查询模型价格表实时计算成本,并将该指标也写入 InfluxDB。我们还可以在 Firestore 文档中增加一个 user_feedback
字段,让用户能对不满意的 Agent 响应进行标记,从而形成一个完整的数据闭环。通过 Firebase Functions,我们可以监听 Firestore 中 status: "failed"
的新文档,并自动触发告警。
然而,这个方案并非银弹。它最大的局限性在于数据写入的非原子性。在极端情况下(例如,Firestore 写入后、InfluxDB 写入前,服务崩溃),会导致数据不一致。对于一个可观测性系统,这通常是可以容忍的,但对于需要强一致性的计费或审计系统,则需要引入更复杂的机制,如通过消息队列进行解耦和持久化。
此外,当前的回调处理器是同步执行数据格式化和发送的。在高 QPS 场景下,这可能会对主应用线程造成微小的延迟。一个进阶的优化是将写入操作完全异步化,例如将数据点推送到一个内存队列中,由一个独立的线程池负责批量消费并发送到数据库。
最后,这个系统是为单个应用设计的。如果 LangChain 服务是大型微服务架构的一部分,要实现端到端的全链路追踪,仍需将我们的 trace_id
与上游服务的 OpenTelemetry trace_id
进行关联,这需要更深层次的上下文传递机制。