为 LangChain 应用构建基于 Firestore 与 InfluxDB 的双核可观测性系统


任何运行在生产环境的 LangChain 应用,尤其是涉及 Agent 或复杂 Chain 的系统,很快就会变成一个难以理解的黑盒。当用户抱怨响应缓慢时,瓶颈是 LLM 调用、工具执行还是数据解析?当运营部门质询成本时,哪个用户的哪类请求消耗了最多的 gpt-4-turbo tokens?传统的 APM 工具在这种新兴的、以 LLM 为核心的架构面前显得力不从心,它们为 HTTP 请求和数据库查询而生,却无法理解“Agent 思考过程”、“Token 消耗”或“工具调用元数据”这些核心概念。

问题的本质在于,LLM 应用的可观测性需要一个定制化的解决方案,一个能同时处理结构化事件流和高基数时序指标的系统。我们面临的挑战是,在不显著增加应用延迟的前提下,捕获、存储并可视化每一次 Agent 执行的完整生命周期。

# /callbacks/instrumentation.py
import os
import time
from uuid import UUID
from typing import Any, Dict, List, Union

from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_core.outputs import AgentAction, AgentFinish, LLMResult

from .metrics_client import InfluxMetricsClient
from .trace_store import FirestoreTraceStore

class DualCoreObservabilityHandler(BaseCallbackHandler):
    """
    一个将追踪数据写入 Firestore 并将时序指标发送到 InfluxDB 的回调处理器。
    这是我们可观测性系统的核心数据收集器。
    """
    def __init__(self, trace_id: str, user_id: str):
        # 确保每个请求链路都有唯一标识
        self.trace_id = trace_id
        self.user_id = user_id

        # 初始化双核存储客户端
        self.trace_store = FirestoreTraceStore(collection_name="llm_traces")
        self.metrics_client = InfluxMetricsClient(
            url=os.environ.get("INFLUXDB_URL"),
            token=os.environ.get("INFLUXDB_TOKEN"),
            org=os.environ.get("INFLUXDB_ORG"),
            bucket="llm_metrics"
        )
        
        self.start_time = time.time()
        self._steps = []
        self._current_step = {}

    def on_chain_start(
        self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
    ) -> None:
        """记录整个调用链的开始。"""
        self.start_time = time.time()
        initial_payload = {
            "trace_id": self.trace_id,
            "user_id": self.user_id,
            "start_time": self.start_time,
            "status": "running",
            "inputs": inputs,
            "steps": [],
        }
        # 在 Firestore 中创建追踪文档的骨架
        self.trace_store.create_trace(self.trace_id, initial_payload)

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """在 LLM 调用开始时,记录当前步骤的开始时间。"""
        self._current_step = {
            "type": "llm",
            "start_time": time.time(),
            "prompt": "\n---\n".join(prompts),
        }

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """在 LLM 调用结束时,计算指标并记录详细信息。"""
        end_time = time.time()
        latency = (end_time - self._current_step["start_time"]) * 1000  # ms
        
        # 提取关键的 token 和模型信息
        usage = response.llm_output.get("token_usage", {})
        prompt_tokens = usage.get("prompt_tokens", 0)
        completion_tokens = usage.get("completion_tokens", 0)
        model_name = response.llm_output.get("model_name", "unknown")

        # 记录追踪数据
        self._current_step.update({
            "end_time": end_time,
            "latency_ms": latency,
            "model_name": model_name,
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
            "response": response.generations[0][0].text,
        })
        self._steps.append(self._current_step)

        # 发送指标数据到 InfluxDB
        tags = {"user_id": self.user_id, "model_name": model_name, "step_type": "llm"}
        fields = {
            "latency_ms": float(latency),
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
            "total_tokens": prompt_tokens + completion_tokens,
        }
        self.metrics_client.write_point("llm_performance", tags, fields)
        
        self._current_step = {}

    def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
    ) -> None:
        """记录工具调用的开始。"""
        self._current_step = {
            "type": "tool",
            "start_time": time.time(),
            "tool_name": serialized.get("name"),
            "tool_input": input_str,
        }

    def on_tool_end(
        self, output: str, **kwargs: Any
    ) -> None:
        """记录工具调用的结束和性能。"""
        end_time = time.time()
        latency = (end_time - self._current_step["start_time"]) * 1000

        self._current_step.update({
            "end_time": end_time,
            "latency_ms": latency,
            "tool_output": output,
        })
        self._steps.append(self._current_step)

        tags = {"user_id": self.user_id, "tool_name": self._current_step["tool_name"]}
        fields = {"latency_ms": float(latency)}
        self.metrics_client.write_point("tool_performance", tags, fields)

        self._current_step = {}
        
    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        """在整个链结束时,更新最终状态和追踪文档。"""
        total_latency = (time.time() - self.start_time) * 1000
        final_payload = {
            "status": "completed",
            "end_time": time.time(),
            "total_latency_ms": total_latency,
            "outputs": outputs,
            "steps": self._steps,
        }
        self.trace_store.update_trace(self.trace_id, final_payload)
        
        tags = {"user_id": self.user_id}
        fields = {"total_latency_ms": float(total_latency)}
        self.metrics_client.write_point("chain_performance", tags, fields)

    # 同样可以实现 on_agent_action, on_agent_finish 等来捕获更丰富的信息...

定义复杂问题:LLM 应用的可观测性维度

一个通用的 APM 方案,例如将 OpenTelemetry 集成到应用中,再搭配 Prometheus 和 Jaeger,是许多微服务架构的标准实践。然而,在 LangChain 应用场景下,这种方案存在几个根本性的不匹配:

  1. 数据模型的错位:传统 APM 的核心是 Span,它为请求而设计,包含 service_namehttp_method 等标签。但我们真正关心的是 model_nameprompt_tokenstool_nameagent_thought。将这些 LLM 特有的元数据硬塞进通用标签中,会使查询和分析变得异常笨拙和低效。
  2. 高基数维度的挑战:我们希望按 user_idsession_id 甚至 request_id 来分析成本和性能。在用户量大的系统中,这会导致 Prometheus 的时序数据基数爆炸,严重影响其性能和存储效率。
  3. 非结构化数据的存储:Agent 的思考过程、完整的 prompt 和 response,这些是富文本、半结构化的数据。将它们存储在 Jaeger 的日志或标签中,不仅查询困难,而且不适合做深度分析。它更像是一份日志,而非一份可供分析的结构化文档。

因此,直接套用传统方案,最终会得到一个既昂贵又难用的系统。我们需要的是一个为 LLM 应用量身定制的架构。

方案 A:基于 OpenTelemetry 的扩展方案

这个方案的思路是扩展现有的标准,而非完全自研。

  • 优点:

    • 遵循行业标准,生态成熟,可以与现有的基础设施(如服务网格、网关)无缝集成。
    • 有大量的现成 Collector 和后端存储(Jaeger, Zipkin, Prometheus)可供选择。
    • 社区活跃,客户端 SDK 语言支持广泛。
  • 缺点与实现难点:

    • 语义约定: 需要定义一套全新的 OpenTelemetry Semantic Conventions for LLM。例如,llm.model.namellm.usage.prompt_tokens 等。这需要团队内部达成共识并严格遵守,否则数据将混乱不堪。
    • 后端查询复杂性: 在 Grafana 中,你需要编写极其复杂的 PromQL 或 LogQL 查询,通过多个标签 JOIN 来关联一个请求中的 LLM 调用和工具调用,这几乎是不现实的。
    • 存储成本: 为了保存完整的 prompt/response,你可能会将它们作为 Span 的日志或属性。这会导致后端存储(如 Elasticsearch 或 Jaeger with Cassandra)的索引和存储成本急剧上升。

这个方案看似“标准”,但在实践中,为了适应 LLM 的特殊性,你需要进行大量的定制和变通,最终可能违背了使用标准的初衷。

方案 B:Firestore 与 InfluxDB 的双核存储方案

这个方案的核心是承认不同类型数据的存储需求是不同的,并为它们选择最合适的工具。

flowchart TD
    subgraph App Host
        A[LangChain Agent] -- "triggers" --> B(DualCoreObservabilityHandler);
    end

    subgraph Observability Pipeline
        B -- "Trace Document (JSON)" --> C[Firestore];
        B -- "Metrics Points (Line Protocol)" --> D[InfluxDB];
    end
    
    subgraph Visualization Layer
        E[UnoCSS Dashboard]-- "Queries for traces" --> C;
        E -- "Queries for metrics (Flux)" --> D;
    end

    Dev[Developer] --> E;
  • 架构选择理由:

    • Firestore for Traces: Agent 的执行过程本质上是一个事件流,最终构成一个结构复杂、可能深度嵌套的文档。Firestore 的文档模型是存储这类数据的理想选择。它的 schemaless 特性允许我们轻松地增加新的字段,而无需进行数据库迁移。实时更新能力也为构建一个能直播 Agent 执行过程的调试工具提供了可能。
    • InfluxDB for Metrics: 延迟、Token 数量、调用次数等都是典型的时序数据。InfluxDB 专为此类数据设计,具备极高的数据写入吞吐量和查询性能,其内置的 Flux 查询语言和时间窗口函数能非常高效地进行聚合分析,例如计算“过去一小时 P99 延迟”或“每个用户昨日的 Token 总消耗”。
    • UnoCSS for Dashboard: 我们需要一个高度定制化的前端来同时展示来自两个数据源的信息。一个典型的视图是:上方是来自 InfluxDB 的性能图表,下方是来自 Firestore 的详细调用追踪。使用 UnoCSS 这种原子化 CSS 框架,可以极快地构建出轻量、高性能、数据密集型的 UI,而无需引入重型前端框架的复杂性和构建开销。
  • 潜在问题:

    • 数据一致性: 写入是分离的,可能出现 Firestore 写入成功但 InfluxDB 写入失败的情况。这会导致有追踪但无指标。在实践中,可以通过在回调处理器中增加重试和死信队列逻辑来缓解,但无法做到原子性保证。对于可观测性系统,这种偶发的不一致通常是可以接受的。
    • 开发成本: 相比使用现成的 Grafana,构建一个自定义前端需要额外的工作。但考虑到所能获得的定制化体验和查询效率,这种投入是值得的。

最终,我们选择方案 B。它通过专业分工,让每个组件都做自己最擅长的事,从而构建一个在性能、成本和功能上都远超通用方案的专用可观测性平台。

核心实现概览

1. 后端数据存储客户端

除了上面的回调处理器,我们还需要两个健壮的客户端与数据库交互。

# /callbacks/metrics_client.py
import logging
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

class InfluxMetricsClient:
    """一个简单的 InfluxDB 客户端封装,用于写入性能指标。"""
    def __init__(self, url: str, token: str, org: str, bucket: str):
        try:
            self._client = InfluxDBClient(url=url, token=token, org=org)
            self._write_api = self._client.write_api(write_options=SYNCHRONOUS)
            self.bucket = bucket
            self.org = org
        except Exception as e:
            logging.error(f"Failed to connect to InfluxDB: {e}")
            self._client = None
            self._write_api = None

    def write_point(self, measurement: str, tags: Dict[str, str], fields: Dict[str, Any]):
        """
        写入一个数据点。
        在生产环境中,这里应该改为异步写入或批量写入,以减少对主应用的影响。
        """
        if not self._write_api:
            logging.warning("InfluxDB client not initialized. Skipping metric write.")
            return

        point = Point(measurement)
        for tag_key, tag_value in tags.items():
            point.tag(tag_key, tag_value)
        for field_key, field_value in fields.items():
            point.field(field_key, field_value)
        
        try:
            self._write_api.write(bucket=self.bucket, org=self.org, record=point)
        except Exception as e:
            # 关键:必须捕获异常,防止可观测性系统的故障影响主业务逻辑。
            logging.error(f"Failed to write point to InfluxDB: {e}")

    def close(self):
        if self._client:
            self._client.close()

# /callbacks/trace_store.py
import logging
import firebase_admin
from firebase_admin import credentials, firestore

class FirestoreTraceStore:
    """封装与 Firestore 的交互,用于存储和更新追踪文档。"""
    _db = None

    def __init__(self, collection_name: str):
        if not firebase_admin._apps:
            # 在真实项目中,SDK 初始化应该在应用启动时完成,这里仅为示例。
            try:
                cred = credentials.ApplicationDefault()
                firebase_admin.initialize_app(cred)
            except Exception as e:
                logging.error(f"Failed to initialize Firebase Admin SDK: {e}")
        
        if firebase_admin._apps:
            self._db = firestore.client()
            self.collection_ref = self._db.collection(collection_name)
        else:
            self._db = None
            self.collection_ref = None

    def create_trace(self, trace_id: str, payload: Dict[str, Any]):
        if not self.collection_ref:
            logging.warning("Firestore client not initialized. Skipping trace creation.")
            return
        try:
            self.collection_ref.document(trace_id).set(payload)
        except Exception as e:
            logging.error(f"Failed to create trace in Firestore: {e}")

    def update_trace(self, trace_id: str, payload: Dict[str, Any]):
        if not self.collection_ref:
            logging.warning("Firestore client not initialized. Skipping trace update.")
            return
        try:
            # 使用 merge=True 来更新文档,而不是覆盖。
            self.collection_ref.document(trace_id).set(payload, merge=True)
        except Exception as e:
            logging.error(f"Failed to update trace in Firestore: {e}")

2. 前端可视化仪表盘 (UnoCSS)

这是一个独立的 HTML 文件,展示了如何用 UnoCSS 快速构建一个数据展示界面。它不依赖任何构建工具,直接通过 CDN 使用,非常适合快速原型或内部工具。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>LLM Observability Dashboard</title>
    <script src="https://cdn.jsdelivr.net/npm/@unocss/runtime"></script>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <style>
        /* UnoCSS 运行时需要一个占位符 */
        :root { --un-default-border-color: #e5e7eb; }
        .un-bg-gray-800 { --un-bg-opacity: 1; background-color: rgba(31, 41, 55, var(--un-bg-opacity)); }
        .dark .un-bg-gray-800 { --un-bg-opacity: 1; background-color: rgba(31, 41, 55, var(--un-bg-opacity)); }
    </style>
</head>
<body class="bg-gray-100 dark:bg-gray-900 text-gray-800 dark:text-gray-200 font-sans">

<div class="h-screen w-screen flex">
    <!-- Sidebar for Traces -->
    <aside class="w-96 bg-white dark:bg-gray-800 p-4 border-r border-gray-200 dark:border-gray-700 overflow-y-auto">
        <h2 class="text-xl font-bold mb-4">Recent Traces</h2>
        <div id="trace-list" class="space-y-2">
            <!-- Trace items will be injected here -->
        </div>
    </aside>

    <!-- Main Content -->
    <main class="flex-1 p-6 flex flex-col">
        <header class="mb-6">
            <h1 class="text-3xl font-bold">Trace Details</h1>
            <p id="selected-trace-id" class="text-gray-500">Select a trace to see details</p>
        </header>
        
        <!-- Metrics Charts from InfluxDB -->
        <section class="grid grid-cols-1 md:grid-cols-3 gap-4 mb-6">
            <div class="bg-white dark:bg-gray-800 p-4 rounded-lg shadow">
                <h3 class="font-semibold mb-2">Total Latency (ms)</h3>
                <canvas id="latencyChart"></canvas>
            </div>
            <div class="bg-white dark:bg-gray-800 p-4 rounded-lg shadow">
                <h3 class="font-semibold mb-2">Token Usage</h3>
                <canvas id="tokenChart"></canvas>
            </div>
            <div class="bg-white dark:bg-gray-800 p-4 rounded-lg shadow">
                <h3 class="font-semibold mb-2">Cost (USD)</h3>
                <p class="text-3xl font-bold text-green-500">$0.1234</p>
            </div>
        </section>

        <!-- Trace Steps from Firestore -->
        <section class="flex-1 bg-white dark:bg-gray-800 p-4 rounded-lg shadow overflow-y-auto">
            <h3 class="font-semibold mb-4">Execution Steps</h3>
            <div id="steps-container" class="font-mono text-sm space-y-4">
                <!-- Step details will be injected here -->
            </div>
        </section>
    </main>
</div>

<script>
    // In a real app, this data would come from API calls to your backend,
    // which in turn queries Firestore and InfluxDB.
    const mockTraces = [
        { id: 'trace-123', status: 'completed', total_latency_ms: 1250.5, user_id: 'user-A' },
        { id: 'trace-456', status: 'failed', total_latency_ms: 830.2, user_id: 'user-B' },
        { id: 'trace-789', status: 'completed', total_latency_ms: 3410.0, user_id: 'user-A' },
    ];
    
    const mockTraceDetails = {
        'trace-123': {
            steps: [
                { type: 'llm', latency_ms: 800, model_name: 'gpt-4', prompt_tokens: 500, completion_tokens: 150, response: 'Thought: I should use the search tool.' },
                { type: 'tool', latency_ms: 400, tool_name: 'search', tool_input: 'latest AI news', tool_output: '[...]' }
            ]
        }
    };

    function renderTraceList() {
        const listEl = document.getElementById('trace-list');
        listEl.innerHTML = mockTraces.map(trace => `
            <div class="p-3 rounded-md cursor-pointer hover:bg-gray-100 dark:hover:bg-gray-700 border border-gray-200 dark:border-gray-600"
                 onclick="selectTrace('${trace.id}')">
                <p class="font-semibold truncate">${trace.id}</p>
                <div class="flex justify-between text-xs text-gray-500 dark:text-gray-400">
                    <span>${trace.status === 'completed' ? '✅' : '❌'} ${trace.total_latency_ms.toFixed(0)}ms</span>
                    <span>${trace.user_id}</span>
                </div>
            </div>
        `).join('');
    }

    function selectTrace(traceId) {
        document.getElementById('selected-trace-id').textContent = `Trace ID: ${traceId}`;
        const details = mockTraceDetails[traceId];
        const container = document.getElementById('steps-container');
        if (details) {
            container.innerHTML = details.steps.map(step => {
                if (step.type === 'llm') {
                    return `<div class="p-3 bg-blue-50 dark:bg-blue-900/50 border-l-4 border-blue-500 rounded">
                                <div class="flex justify-between items-center font-bold">
                                    <span>LLM Call (${step.model_name})</span>
                                    <span class="text-blue-500">${step.latency_ms.toFixed(0)}ms</span>
                                </div>
                                <div class="text-xs text-gray-500 mt-1">
                                    Tokens: ${step.prompt_tokens} (p) + ${step.completion_tokens} (c)
                                </div>
                                <pre class="mt-2 p-2 bg-gray-100 dark:bg-gray-700 rounded text-xs whitespace-pre-wrap">${step.response}</pre>
                            </div>`;
                }
                // ... render tool steps similarly
                return '';
            }).join('');
        }
    }
    
    // Initialize charts (mock data)
    new Chart(document.getElementById('latencyChart'), { type: 'line', data: { labels: ['-5m', '-4m', '-3m', '-2m', '-1m'], datasets: [{ label: 'P95 Latency', data: [1100, 1200, 1050, 1300, 1250], tension: 0.1 }] } });
    new Chart(document.getElementById('tokenChart'), { type: 'bar', data: { labels: ['Prompt', 'Completion'], datasets: [{ label: 'Tokens', data: [45000, 12000] }] } });

    renderTraceList();
</script>
</body>
</html>

这个 HTML 文件完全自包含。你只需用浏览器打开它,就能看到一个由 UnoCSS 驱动的界面。它通过 class 属性直接应用样式,如 flex, p-4, rounded-lg, dark:bg-gray-800,实现了无需编写任何 CSS 文件的开发体验,同时保持了极高的性能和可维护性。

架构的扩展性与局限性

此双核架构为后续的扩展提供了坚实的基础。例如,可以在 on_chain_end 回调中增加一个 cost 字段,通过查询模型价格表实时计算成本,并将该指标也写入 InfluxDB。我们还可以在 Firestore 文档中增加一个 user_feedback 字段,让用户能对不满意的 Agent 响应进行标记,从而形成一个完整的数据闭环。通过 Firebase Functions,我们可以监听 Firestore 中 status: "failed" 的新文档,并自动触发告警。

然而,这个方案并非银弹。它最大的局限性在于数据写入的非原子性。在极端情况下(例如,Firestore 写入后、InfluxDB 写入前,服务崩溃),会导致数据不一致。对于一个可观测性系统,这通常是可以容忍的,但对于需要强一致性的计费或审计系统,则需要引入更复杂的机制,如通过消息队列进行解耦和持久化。

此外,当前的回调处理器是同步执行数据格式化和发送的。在高 QPS 场景下,这可能会对主应用线程造成微小的延迟。一个进阶的优化是将写入操作完全异步化,例如将数据点推送到一个内存队列中,由一个独立的线程池负责批量消费并发送到数据库。

最后,这个系统是为单个应用设计的。如果 LangChain 服务是大型微服务架构的一部分,要实现端到端的全链路追踪,仍需将我们的 trace_id 与上游服务的 OpenTelemetry trace_id 进行关联,这需要更深层次的上下文传递机制。


  目录