在 Vercel Functions 中利用 Babel 动态转译实现一个轻量级 GraphQL Feature Store


团队最近的一个项目需要在边缘节点上进行实时用户意图预测,这是一个典型的 ML 应用场景。传统的做法是部署一个独立的、通常是基于 Python 的模型服务,并连接到一个中心化的 Feature Store。但在我们这个场景下,整个业务栈是构建在 Vercel 上的 JAMstack 架构,引入一套重型的、需要独立运维的 Feature Store 体系(如 Feast 或 Tecton)不仅技术栈不匹配,运维成本也显得过高。

痛点很明确:我们需要一个轻量、无服务器、且能与我们现有 Node.js 技术栈无缝集成的方案,来提供实时特征计算。最直接的想法是,能否将“特征”本身也视为代码,即一系列部署在 Vercel Functions 上的纯函数。这些函数接收实体 ID(如 userId),从 Upstash Redis 或 Vercel KV 这类低延迟数据源拉取原始数据,然后计算并返回特征值。

这个初步构想解决了运维重的问题,但引入了新的挑战:数据科学家和算法工程师并非前端或后端开发者,他们习惯用最新的 JavaScript 语法(甚至是处于 Stage-X 的提案)或者 Python/SQL-like 的 DSL 来快速迭代特征逻辑。让他们去处理 Vercel 的部署配置、项目构建、依赖管理,会极大地拖慢迭代速度。

我们需要一个系统,能让算法工程师专注于特征逻辑本身,提交纯粹的逻辑代码,而系统能自动地、动态地将这些代码集成到线上服务中。这就把问题引向了元编程和运行时编译。如果 Vercel Function 能够在接收到请求时,动态加载特征定义文件,实时将其转译为可执行代码,然后运行,问题似乎就迎刃而解。这正是 Babel 的用武之地——不再是构建时工具,而是作为服务运行时的一个核心组件。

最终的架构决策是:使用 GraphQL 作为统一的特征查询入口,它允许客户端(模型服务)按需批量获取特征;Vercel Functions 作为计算引擎;而核心的动态性,则通过在 Function 内部以编程方式调用 Babel API 来实现。

架构设计与数据流

在深入代码之前,我们先用图表理清整个请求的生命周期。当一个特征查询请求到达时,系统内部的流转如下:

sequenceDiagram
    participant Client as 客户端 (模型服务)
    participant Vercel as Vercel Edge Network
    participant GraphQLFn as GraphQL Function (/api/features)
    participant Registry as 特征定义存储 (e.g., S3/GitHub)
    participant Babel as Babel Core (In-Process)
    participant DataSource as 原始数据源 (e.g., Upstash Redis)

    Client->>Vercel: POST /api/features (GraphQL Query)
    Vercel->>GraphQLFn: Invoke Function with Request
    GraphQLFn->>GraphQLFn: 解析查询,获取 featureNames: ["f1", "f2"]
    
    loop 遍历每个 featureName
        GraphQLFn->>Registry: 读取 "f1.js" 的源码
        Registry-->>GraphQLFn: 返回源码字符串
        GraphQLFn->>Babel: programmaticallyInvoke(sourceCode)
        Babel-->>GraphQLFn: 返回转译后的可执行代码字符串
        GraphQLFn->>GraphQLFn: JIT编译执行 (e.g., new Function())
        Note right of GraphQLFn: 此时获得特征计算函数
    end

    GraphQLFn->>DataSource: 批量获取原始数据 (e.g., MGET user:123:profile)
    DataSource-->>GraphQLFn: 返回原始数据
    
    loop 再次遍历每个 feature
        GraphQLFn->>GraphQLFn: executeFeatureFunction(rawData)
        Note right of GraphQLFn: 计算出 "f1" 的值
    end

    GraphQLFn-->>Vercel: 返回 GraphQL Response
    Vercel-->>Client: 响应特征数据

这个架构的核心在于 GraphQLFn 内部的逻辑:它不仅是 API 的提供者,更是一个小型的、按需的编译和执行引擎。

项目搭建与核心依赖

我们从一个标准的 Vercel Node.js 项目开始。

package.json 的核心依赖如下:

{
  "name": "serverless-feature-store",
  "version": "1.0.0",
  "private": true,
  "scripts": {
    "dev": "vercel dev"
  },
  "dependencies": {
    "@babel/core": "^7.23.2",
    "@babel/preset-env": "^7.23.2",
    "graphql": "^16.8.1",
    "graphql-yoga": "^5.0.0",
    "lru-cache": "^10.0.1",
    "redis": "^4.6.10"
    // 对于安全性要求更高的场景,考虑使用 vm2 替代 new Function
    // "vm2": "^3.9.19" 
  }
}

这里的关键是 @babel/core@babel/preset-env,它们将用于在服务器端进行代码转译。graphql-yoga 是一个轻量且功能强大的 GraphQL 服务器实现,非常适合 Vercel Functions。lru-cache 用于缓存转译后的函数,这是至关重要的性能优化点。

动态转译器的实现

这是整个系统的技术核心。我们创建一个 transpiler.js 服务,它负责接收源码字符串,返回一个可执行的函数。在真实项目中,直接使用 evalnew Function 存在安全风险,一个常见的错误是认为只要代码源可信就没问题。但依赖链污染等问题依然存在。对于内部系统,如果特征代码源头(如特定Git仓库)是严格受控的,可以简化处理;否则,必须使用 vm2 等沙箱环境。

api/lib/transpiler.js:

const Babel = require('@babel/core');
const { LRUCache } = require('lru-cache');

// 缓存配置:最多缓存500个转译后的函数,有效期1小时
// 在Vercel Serverless环境中,实例生命周期有限,这个缓存主要用于处理短时间内的重复请求(热函数)
const options = {
  max: 500,
  ttl: 1000 * 60 * 60, 
};
const cache = new LRUCache(options);

// Babel 配置,以编程方式定义
// 这里的坑在于:必须确保目标环境与Vercel Node.js运行时版本匹配
// 比如 Vercel 当前使用 Node.js 18.x,那么 targets.node 应该设置为 "18"
const babelConfig = {
  presets: [
    ['@babel/preset-env', {
      targets: {
        node: '18',
      },
    }],
  ],
  sourceType: 'module', // 假设特征文件是ESM格式
  comments: false,
};

/**
 * 动态转译并编译JS源码
 * @param {string} featureName - 特征名称,用作缓存键
 * @param {string} sourceCode - 从特征仓库获取的JS源码字符串
 * @returns {Promise<Function>} - 返回一个异步的、可执行的特征计算函数
 */
async function getFeatureFunction(featureName, sourceCode) {
  // 步骤1: 检查缓存
  if (cache.has(featureName)) {
    console.log(`[Cache HIT] for feature: ${featureName}`);
    return cache.get(featureName);
  }

  console.log(`[Cache MISS] Compiling feature: ${featureName}`);

  try {
    // 步骤2: 调用Babel Core进行转译
    // 这里是异步的,因为Babel的某些插件或presets可能是异步的
    const result = await Babel.transformAsync(sourceCode, babelConfig);

    if (!result || !result.code) {
      throw new Error(`Babel transpilation failed for ${featureName}: No code generated.`);
    }

    const transpiledCode = result.code;
    
    // 步骤3: JIT编译。这是整个流程中需要高度关注安全性的部分。
    // 我们期望特征文件导出一个名为 `compute` 的异步函数。
    // 通过包装成一个异步立即执行函数表达式(IIFE),我们可以通过 `eval` 获取其导出。
    // 在生产环境中,强烈建议用 vm2 替换。
    // const { VM } = require('vm2');
    // const vm = new VM({ timeout: 1000, sandbox: {} });
    // const featureModule = vm.run(transpiledCode);
    // const featureFunction = featureModule.compute;

    // 使用 new Function 的简化实现,适用于内部可信环境
    // 注意:`new Function` 的作用域是全局的,为了获取 ESM 的 `exports`,我们需要一些技巧。
    // 一个常见的做法是将 CommonJS 风格的模块系统注入进去。
    const wrappedCode = `
      const exports = {};
      (async () => {
        ${transpiledCode.replace(/export\s+default\s+/, 'exports.default = ')}
      })();
      return exports.default;
    `;
    
    const factory = new Function(wrappedCode);
    const featureFunction = factory();


    if (typeof featureFunction !== 'function') {
      throw new Error(`Feature file ${featureName} did not export a default function after transpilation.`);
    }

    // 步骤4: 存入缓存
    cache.set(featureName, featureFunction);

    return featureFunction;
  } catch (error) {
    // 详尽的错误日志至关重要
    console.error(`Failed to transpile or compile feature '${featureName}'.`, {
      message: error.message,
      stack: error.stack,
    });
    // 返回一个固定的错误处理函数,避免整个GraphQL请求失败
    // 这样设计可以实现部分降级,一个特征的失败不影响其他特征
    return async () => ({ error: `Feature computation failed: ${error.message}` });
  }
}

// 模拟从外部存储获取特征定义
// 在真实项目中,这里会是 S3, GitHub API, 或者数据库的调用
const featureRegistry = {
  'user_session_count_7d': `
    // 使用了 Optional Chaining 和 Nullish Coalescing
    // 这些语法可能不被基础 Node.js 18 支持,需要Babel转译
    export default async function compute(redisClient, entityId) {
      const keys = Array.from({ length: 7 }, (_, i) => {
        const d = new Date();
        d.setDate(d.getDate() - i);
        const dateStr = d.toISOString().split('T')[0];
        return \`user:\${entityId}:sessions:\${dateStr}\`;
      });

      const results = await redisClient.mGet(keys);
      return results?.reduce((sum, count) => sum + (parseInt(count, 10) ?? 0), 0) ?? 0;
    }
  `,
  'user_has_active_subscription': `
    // 这是一个简单的特征,直接查询一个键
    export default async function compute(redisClient, entityId) {
      const subStatus = await redisClient.get(\`user:\${entityId}:subscription_status\`);
      return subStatus === 'active';
    }
  `,
};

function getFeatureSource(featureName) {
  // 模拟异步获取
  return Promise.resolve(featureRegistry[featureName]);
}

module.exports = { getFeatureFunction, getFeatureSource };

这个模块封装了缓存、转译和编译的核心逻辑。注意错误处理部分,我们没有直接抛出异常,而是返回一个会产生错误结果的函数。这种容错设计在生产环境中非常重要,它保证了即使某个特征的定义有语法错误,也不会导致整个API请求崩溃。

GraphQL 服务实现

接下来是 GraphQL 的入口文件。

api/features.js:

const { createYoga } = require('graphql-yoga');
const { createSchema, createYoga } = require('graphql-yoga');
const { createClient } = require('redis');
const { getFeatureFunction, getFeatureSource } = require('./lib/transpiler');

// 在 serverless 环境中,连接最好在外部定义,以便复用
const redisClient = createClient({
  url: process.env.UPSTASH_REDIS_URL,
});
redisClient.on('error', err => console.error('Redis Client Error', err));
// 确保在函数调用之间保持连接
if (!redisClient.isOpen) {
    redisClient.connect();
}


const typeDefs = /* GraphQL */ `
  type Feature {
    name: String!
    value: String
    error: String
  }

  type Query {
    getFeatures(entityId: ID!, featureNames: [String!]!): [Feature!]!
  }
`;

const resolvers = {
  Query: {
    getFeatures: async (_, { entityId, featureNames }) => {
      if (!entityId || !featureNames || featureNames.length === 0) {
        throw new Error('entityId and featureNames are required.');
      }
      
      // 在真实项目中,应该对 featureNames 的数量和名称格式进行校验
      if (featureNames.length > 20) {
        throw new Error('Cannot request more than 20 features at a time.');
      }

      console.log(`Processing features for entity: ${entityId}`, featureNames);

      // 并行获取所有特征的源码和转译后的计算函数
      const featurePromises = featureNames.map(async (name) => {
        const sourceCode = await getFeatureSource(name);
        if (!sourceCode) {
          return { 
            name, 
            fn: async () => ({ error: 'Feature definition not found.' }) 
          };
        }
        const fn = await getFeatureFunction(name, sourceCode);
        return { name, fn };
      });

      const featuresWithFuncs = await Promise.all(featurePromises);

      // 现在我们有了所有特征的计算函数,可以开始执行它们
      // 这里的优化点在于,如果多个特征依赖相同的原始数据,可以在这里做一次批处理查询
      // 但为简化示例,我们假设每个特征函数内部自己处理数据获取
      const resultsPromises = featuresWithFuncs.map(async ({ name, fn }) => {
        try {
          // 每个特征函数接收 redis 客户端实例和实体ID
          const value = await fn(redisClient, entityId);

          if (value && typeof value.error !== 'undefined') {
              return { name, value: null, error: value.error };
          }
          
          // GraphQL 不直接支持所有JS类型,统一序列化为字符串
          return { name, value: JSON.stringify(value), error: null };
        } catch (e) { {
          console.error(`Error executing feature '${name}' for entity '${entityId}'`, e);
          return { name, value: null, error: 'Internal computation error.' };
        }
      });

      return Promise.all(resultsPromises);
    },
  },
};

const schema = createSchema({
  typeDefs,
  resolvers,
});

// 将 Yoga server 暴露给 Vercel
export default createYoga({
  schema,
  graphqlEndpoint: '/api/features',
  // 在Vercel上禁用GraphiQL界面,除非是开发环境
  graphiql: process.env.NODE_ENV !== 'production',
});

这个 GraphQL resolver 是整个流程的调度器。它首先并行地为每个请求的特征获取并编译其计算函数(会利用缓存),然后并行地执行这些函数,最后将结果聚合并返回。输入验证(如限制一次请求的特征数量)是保护系统不被滥用的基本措施。

测试与验证

部署到 Vercel 后,我们可以用一个简单的 curl 命令来测试:

curl -X POST \
  -H "Content-Type: application/json" \
  -d '{ "query": "query GetUserFeatures($entityId: ID!, $featureNames: [String!]!) { getFeatures(entityId: $entityId, featureNames: $featureNames) { name value error } }", "variables": { "entityId": "user-123", "featureNames": ["user_session_count_7d", "user_has_active_subscription", "non_existent_feature"] } }' \
  https://<your-vercel-deployment-url>/api/features

预期的返回结果会是:

{
  "data": {
    "getFeatures": [
      {
        "name": "user_session_count_7d",
        "value": "25",
        "error": null
      },
      {
        "name": "user_has_active_subscription",
        "value": "true",
        "error": null
      },
      {
        "name": "non_existent_feature",
        "value": null,
        "error": "Feature definition not found."
      }
    ]
  }
}

这个结果清晰地展示了系统的能力:成功计算了存在的特征,并对不存在或计算失败的特征返回了明确的错误信息,整个查询依然是成功的。

局限性与未来迭代路径

尽管这个方案巧妙地解决了在 Serverless 环境下构建动态 Feature Store 的问题,但它并非银弹,在生产环境中应用需要清醒地认识其边界。

1. 性能开销与冷启动: 最大的瓶颈在于首次执行一个特征时的“编译延迟”。尽管有 LRU 缓存,但 Vercel Function 的冷启动本身就会导致高延迟,再加上动态加载源码、Babel 转译、JIT 编译,第一个请求的响应时间可能会达到数秒。对于延迟敏感的在线预测场景,这是不可接受的。优化方向是将缓存从内存转移到外部高速缓存(如 Redis),让不同函数实例可以共享缓存。

2. 安全性: 如前所述,new Functioneval 是一个巨大的安全隐患。虽然 vm2 提供了沙箱,但它也并非绝对安全,且有性能损耗。这个架构更适合于一个内部平台,所有特征代码的提交都经过严格的 Code Review。

3. 依赖管理: 当前实现下的特征函数是无依赖的“孤岛”。如果特征逻辑需要使用 lodashdate-fns 这类外部库怎么办?这是一个非常棘手的问题。一种可行的方案是维护一个“平台依赖包”,将一组预先审核过的、常用的库打包进 Serverless Function 的部署包中,然后在沙箱环境执行时作为全局变量或通过模块加载器注入。但这增加了系统的复杂性和耦合度。

4. 演进方向: 一个更成熟、更健壮的架构应该将“动态”部分从请求时(runtime)移动到部署时(deploy-time)。可以设计一个 GitOps 流程:当特征仓库有新的提交时,CI/CD 流水线自动触发,它会独立地对每个变更的特征文件运行 Babel 转译,然后将转译后的 JS 文件作为静态资产上传到 S3 或类似存储。GraphQL Function 则不再需要内置 Babel,而是直接从 S3 加载预编译好的代码。这种方式牺牲了一部分动态性(特征更新不再是“即时”的),但换来了极大的性能提升和更高的安全性,因为它彻底避免了在运行时执行未经审核的编译过程。


  目录