构建自定义Rollup插件为Azure AKS上的多语言AI数据服务注入全链路追踪上下文


我们新上线的AI数据科学平台遇到了一个典型的“黑盒”困境。用户在前端发起一个复杂的分析请求,这个请求会触发部署在Azure AKS上的一系列微服务:一个Java后端负责业务逻辑编排和元数据查询(通过MyBatis),并调用一个Python编写的AI模型服务进行核心计算。当一个请求耗时超过预期时,定位瓶颈成了一场跨团队的“甩锅大会”:前端指责后端接口慢,后端怀疑AI模型耗时长,数据科学团队则认为是数据库查询拖了后腿。日志是分散的,关联排查的成本极高。

为了解决这个问题,我们决定引入基于OpenTelemetry的全链路分布式追踪。目标很明确:任何一个用户请求,无论它跨越多少语言、多少服务,都必须拥有一个唯一的Trace ID,并将整个调用链可视化。后端的Java和Python服务通过引入OpenTelemetry SDK和Agent相对容易实现,但真正的挑战在于源头——如何确保从浏览器发起的第一个HTTP请求就被无感地、强制地注入追踪上下文?

在真实项目中,依赖开发者手动在每个fetchaxios调用中添加traceparent头是不可靠的。它容易被遗忘,难以通过代码审查强制执行,并且对业务代码有侵入性。我们的解决方案是:在前端构建阶段,通过一个自定义的Rollup插件,利用抽象语法树(AST)自动改写所有API调用代码,强制注入追踪上下文。

架构概览与追踪流程设计

在深入代码之前,我们先定义整个系统的追踪流程。它必须能够无缝地串联起不同技术栈的组件。

sequenceDiagram
    participant Browser as 浏览器 (Rollup构建)
    participant JavaService as Java业务服务 (AKS)
    participant MyBatis
    participant PythonAIService as Python AI服务 (AKS)
    participant OTelCollector as OpenTelemetry Collector

    Browser->>+JavaService: 发起API请求 (自动注入traceparent)
    JavaService->>+MyBatis: 执行数据库查询
    MyBatis-->>-JavaService: 返回查询结果
    JavaService->>+PythonAIService: 调用模型 (传递traceparent)
    PythonAIService-->>-JavaService: 返回模型结果
    JavaService-->>-Browser: 返回最终响应

    Browser->>OTelCollector: 上报 Frontend Span
    JavaService->>OTelCollector: 上报 Backend Spans (HTTP, MyBatis)
    PythonAIService->>OTelCollector: 上报 AI Service Span

这个流程的核心在于,浏览器发起的第一个请求必须包含一个合法的traceparent W3C追踪上下文头。后续所有服务都将延续这个上下文。

后端服务的OpenTelemetry准备

要在前端注入追踪上下文,我们首先需要一个能够接收和处理它的后端。

Java业务服务 (Spring Boot + MyBatis)

我们使用OpenTelemetry的Java Agent,这是最无侵入性的方式。只需要在AKS的Deployment YAML中为容器添加几个JVM参数。

Dockerfile片段 (示意):

# Dockerfile
FROM openjdk:17-slim
WORKDIR /app
# 下载并放置OTel Java Agent
ADD https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar .
COPY target/my-app-*.jar app.jar
# 启动命令将在AKS YAML中定义

AKS Deployment YAML片段 (deployment.yaml):

apiVersion: apps/v1
kind: Deployment
metadata:
  name: java-business-service
spec:
  template:
    spec:
      containers:
      - name: app
        image: your-repo/java-service:latest
        env:
          - name: OTEL_SERVICE_NAME
            value: "java-business-service"
          - name: OTEL_EXPORTER_OTLP_ENDPOINT
            value: "http://otel-collector-service:4317" # 指向OTel Collector
          - name: OTEL_PROPAGATORS
            value: "tracecontext,baggage"
          - name: JAVA_TOOL_OPTIONS
            value: "-javaagent:/app/opentelemetry-javaagent.jar" # 挂载Agent
        ports:
        - containerPort: 8080

有了这个配置,Spring WebMVC的HTTP入口和MyBatis的JDBC调用都会被自动检测并生成相应的Span,同时它会自动解析传入请求中的traceparent头,并将其传播到后续的HTTP客户端调用中。

MyBatis接口示例 (无需任何改动):

// src/main/java/com/example/dataservice/mapper/QueryMapper.java
package com.example.dataservice.mapper;

import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.Map;

@Mapper
public interface QueryMapper {

    @Select("SELECT metadata FROM queries WHERE query_id = #{queryId}")
    Map<String, Object> getQueryMetadata(String queryId);
}

Python AI服务 (Flask)

对于Python服务,我们使用OpenTelemetry SDK进行手动但简单的配置。

依赖 (requirements.txt):

flask
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp-proto-grpc
opentelemetry-instrumentation-flask
opentelemetry-instrumentation-requests

服务初始化代码 (app.py):

# app.py
import logging
from flask import Flask, request, jsonify

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
import time
import random

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 1. 配置 OpenTelemetry
resource = Resource(attributes={"service.name": "python-ai-service"})
provider = TracerProvider(resource=resource)
# 使用OTLP gRPC导出器,指向OTel Collector
otlp_exporter = OTLPSpanExporter(endpoint="otel-collector-service:4317", insecure=True)
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

# 2. 初始化 Flask 应用
app = Flask(__name__)

# 3. 自动仪表化 Flask 和 requests
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()

@app.route('/predict', methods=['POST'])
def predict():
    """
    一个模拟的AI预测端点
    """
    # FlaskInstrumentor 会自动从请求头中提取trace context
    with tracer.start_as_current_span("ai_model_processing") as span:
        try:
            data = request.get_json()
            if not data or 'features' not in data:
                return jsonify({"error": "Missing features"}), 400

            # 模拟复杂的计算
            processing_time = 0.5 + random.uniform(0.1, 0.4)
            time.sleep(processing_time)
            span.set_attribute("model.name", "complex_model_v1")
            span.set_attribute("processing.time_ms", int(processing_time * 1000))

            result = {"prediction": random.randint(0, 100)}
            logger.info(f"Prediction successful for features: {data['features']}")
            return jsonify(result)

        except Exception as e:
            span.record_exception(e)
            span.set_status(trace.Status(trace.StatusCode.ERROR, "Model processing failed"))
            logger.error(f"Error during prediction: {e}", exc_info=True)
            return jsonify({"error": "Internal server error"}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001)

至此,后端链路已经准备就绪,可以接收并传播W3C追踪上下文了。

核心:自定义Rollup插件的实现

现在我们来解决最关键的部分:在前端构建时自动注入追踪逻辑。我们将编写一个名为otel-api-injector的Rollup插件。

项目结构:

frontend/
├── node_modules/
├── plugins/
│   └── otel-api-injector.js  # 我们的自定义插件
├── src/
│   ├── api.js
│   └── main.js
├── package.json
└── rollup.config.js

安装AST处理依赖:
我们需要acorn来将代码解析成AST,以及estree-walker来遍历这个树。

npm install acorn estree-walker magic-string --save-dev

magic-string是一个非常实用的库,它可以在不破坏原始代码sourcemap的情况下对代码进行修改。

自定义插件 plugins/otel-api-injector.js:

// plugins/otel-api-injector.js
import { walk } from 'estree-walker';
import MagicString from 'magic-string';
import { parse } from 'acorn';

const API_CLIENT_MODULE_NAME = 'api.js'; // 我们假定所有API调用都定义在这个文件中

/**
 * 一个自定义Rollup插件,用于自动为 `fetch` 调用注入 OpenTelemetry 追踪头。
 * 它通过AST分析代码,找到所有 `fetch` 调用并改写它们。
 */
export default function otelApiInjector() {
  return {
    name: 'otel-api-injector',

    /**
     * Rollup的 transform 钩子,在每个模块加载后执行。
     * @param {string} code - 模块的源代码
     * @param {string} id - 模块的ID (文件路径)
     * @returns {object|null} - 包含改写后代码和sourcemap的对象,或null
     */
    transform(code, id) {
      // 我们只关心我们自己定义的API客户端文件,避免修改node_modules中的代码
      if (!id.endsWith(API_CLIENT_MODULE_NAME)) {
        return null;
      }

      let ast;
      try {
        ast = parse(code, {
          ecmaVersion: 'latest',
          sourceType: 'module'
        });
      } catch (err) {
        this.warn({ message: `Failed to parse ${id}: ${err.message}` });
        return null;
      }

      const magicString = new MagicString(code);
      let modified = false;

      walk(ast, {
        enter(node) {
          // 目标: 找到所有 `fetch(url, options)` 的调用表达式
          if (node.type !== 'CallExpression' || node.callee.name !== 'fetch') {
            return;
          }

          modified = true;
          this.skip(); // 找到后不再深入子节点

          // 注入的辅助函数代码,用于获取和附加追踪头
          const injectorHelper = `
(async () => {
    // 动态导入OTel API以支持代码分割和延迟加载
    const { trace, context } = await import('@opentelemetry/api');
    const { W3CTraceContextPropagator } = await import('@opentelemetry/core');

    const propagator = new W3CTraceContextPropagator();
    const activeContext = context.active();
    const headers = {};
    propagator.inject(activeContext, headers);
    return headers;
})()
`;

          const fetchArgs = node.arguments;

          if (fetchArgs.length === 1) {
            // 原调用: fetch(url)
            // 改写为: fetch(url, { headers: await injectorHelper })
            magicString.appendLeft(fetchArgs[0].end, `, { headers: await ${injectorHelper} }`);
          } else if (fetchArgs.length > 1 && fetchArgs[1].type === 'ObjectExpression') {
            // 原调用: fetch(url, { method: 'POST', ... })
            const optionsNode = fetchArgs[1];
            const headersProp = optionsNode.properties.find(
              p => p.key.name === 'headers'
            );

            if (headersProp) {
              // headers 已存在: { headers: { ...existingHeaders }, ... }
              // 改写为: { headers: { ...existingHeaders, ...await injectorHelper }, ... }
              const headersValue = headersProp.value;
              if (headersValue.type === 'ObjectExpression') {
                  magicString.appendLeft(headersValue.end - 1, `, ...(await ${injectorHelper})`);
              } else {
                  // 对于动态headers对象,如变量,采用更保守的合并方式
                  const originalHeaders = code.substring(headersValue.start, headersValue.end);
                  magicString.overwrite(
                      headersValue.start,
                      headersValue.end,
                      `{ ...(${originalHeaders}), ...(await ${injectorHelper}) }`
                  );
              }
            } else {
              // headers 不存在: { method: 'POST' }
              // 改写为: { headers: await injectorHelper, method: 'POST' }
              magicString.appendLeft(optionsNode.start + 1, ` headers: await ${injectorHelper},`);
            }
          } else {
              // 捕获不常见的 fetch 用法, 比如 fetch(url, someVariable)
              this.warn({
                  message: `Skipping complex fetch call in ${id} at line ${node.loc.start.line}. Manual instrumentation may be required.`
              });
          }
        }
      });

      if (!modified) {
        return null;
      }

      return {
        code: magicString.toString(),
        map: magicString.generateMap({ hires: true })
      };
    }
  };
}

插件代码解析:

  1. name: 插件的唯一标识。
  2. transform hook: 这是插件的核心。Rollup会为每个处理的文件调用这个钩子。
  3. 路径过滤: 我们只对特定的api.js文件进行转换,这是一个常见的性能优化和安全措施,避免意外修改第三方库。
  4. AST解析: 使用acorn.parse将源代码字符串转换为AST。这是静态分析和代码改写的基础。
  5. estree-walker: 我们用它来深度优先遍历AST。enter函数在进入每个节点时被调用。
  6. 节点匹配: 我们寻找类型为CallExpression且其callee(被调用的函数)名为fetch的节点。
  7. MagicString改写: 一旦找到目标fetch调用,我们不直接修改字符串,而是使用magicString记录修改。它能智能地处理插入、覆盖等操作,并最终生成正确的代码和sourcemap。
  8. 注入逻辑: 注入的代码块是一个立即执行的异步函数,它动态导入OpenTelemetry的API,创建W3CTraceContextPropagator,并生成包含traceparentheaders对象。
  9. 处理不同fetch用法: 代码考虑了fetch(url)fetch(url, options)两种情况,并对options中是否已存在headers做了不同处理,确保了代码的健壮性。

配置Rollup (rollup.config.js):

// rollup.config.js
import resolve from '@rollup/plugin-node-resolve';
import commonjs from '@rollup/plugin-commonjs';
import otelApiInjector from './plugins/otel-api-injector.js'; // 导入我们的插件

export default {
  input: 'src/main.js',
  output: {
    file: 'dist/bundle.js',
    format: 'iife',
    sourcemap: true,
  },
  plugins: [
    otelApiInjector(), // 在其他插件之前或之后,取决于你的需求,这里放在前面
    resolve(),
    commonjs(),
  ]
};

前端业务代码 (src/api.jssrc/main.js)

src/api.js (被插件转换的目标文件):

// src/api.js

export async function getAnalysis(query) {
    // 开发者编写的代码,非常干净,没有追踪相关的逻辑
    const response = await fetch('/api/analyze', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify({ query })
    });
    if (!response.ok) {
        throw new Error('Network response was not ok');
    }
    return response.json();
}

export async function getSimpleData() {
    // 另一个没有options的fetch调用
    const response = await fetch('/api/data');
    return response.json();
}

src/main.js (前端入口与OpenTelemetry初始化):

// src/main.js
import { getAnalysis } from './api.js';

// OpenTelemetry Web 初始化
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { ZoneContextManager } from '@opentelemetry/context-zone';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { getWebAutoInstrumentations } from '@opentelemetry/auto-instrumentations-web';

const provider = new WebTracerProvider({
    // ... resource config
});

provider.addSpanProcessor(new SimpleSpanProcessor(new OTLPTraceExporter({
    url: 'http://localhost:4318/v1/traces', // 指向本地或远程的OTel Collector
})));

provider.register({
    contextManager: new ZoneContextManager(),
});

registerInstrumentations({
  instrumentations: [
    getWebAutoInstrumentations({
      // 禁用 fetch instrumentation,因为我们已经通过插件手动注入了上下文
      // 如果开启,可能会导致重复的头
      '@opentelemetry/instrumentation-fetch': {
        enabled: false, 
      },
    }),
  ],
});


// 业务逻辑
document.getElementById('runAnalysisBtn').addEventListener('click', async () => {
    try {
        const result = await getAnalysis('complex user query');
        console.log('Analysis Result:', result);
    } catch (error) {
        console.error('Failed to get analysis:', error);
    }
});

当执行rollup -c构建后,dist/bundle.js中对应的getAnalysis函数会被自动改写成类似这样:

// dist/bundle.js (示意性,非真实代码)
async function getAnalysis(query) {
    const response = await fetch('/api/analyze', {
        // 插件注入的headers逻辑
        headers: { ...(await (async () => {
            const { trace, context } = await import('@opentelemetry/api');
            const { W3CTraceContextPropagator } = await import('@opentelemetry/core');
            const propagator = new W3CTraceContextPropagator();
            const activeContext = context.active();
            const headers = {};
            propagator.inject(activeContext, headers);
            return headers;
        })()), 'Content-Type': 'application/json' },
        method: 'POST',
        body: JSON.stringify({ query })
    });
    // ...
}

这实现了我们的核心目标:业务代码保持干净,追踪能力在构建时被透明地织入。

当前方案的局限性与未来展望

这种基于AST的构建时注入方案优雅地解决了前端追踪上下文自动传播的问题,但也存在其适用边界。

首先,我们的插件目前只处理了全局的fetch函数调用。如果项目中使用了像axios这样的HTTP库,或者开发者对fetch进行了封装,AST的匹配逻辑就需要变得更加复杂和智能,可能需要识别库的特定API模式。

其次,对于动态生成的代码或使用eval执行的API调用,这种静态分析方法是无能为力的。幸运的是,在现代前端工程实践中,这类用法已非常罕见。

未来的一个优化方向是提升插件的配置能力,例如允许用户通过配置定义需要被注入的函数名或模块,使其不仅仅局限于api.js中的fetch。另一个更有趣的探索是,结合eBPF等技术在AKS集群的服务网格层实现上下文传播,但这将把问题从构建时转移到运行时,是另一种完全不同的技术路径,各有优劣。对于我们目前的应用场景,一个健壮的Rollup插件已足够应对挑战,并为开发团队提供了近乎零成本的可观测性接入体验。


  目录