构建基于 CDC 和 LLM 的 Node.js 实时数据管道同步 Meilisearch 索引


项目的搜索功能需求从简单的关键词匹配演变成了语义理解。这意味着我们不能再依赖数据库的 LIKE 查询。引入全文搜索引擎是必然选择,我们看中了 Meilisearch 的速度和易用性。但真正的问题在于数据同步:如何保证主业务数据库(PostgreSQL)中的数据变更能实时、可靠地反映到 Meilisearch 索引中,并且这个过程需要用 LLM 进行数据增强(例如生成向量嵌入)?

最初的方案是双写:在业务代码中,每次更新数据库后,再异步调用一次 Meilisearch 的 API。这个方案很快被否决。它严重侵入业务逻辑,增加了代码耦合度,并且无法保证数据一致性——如果数据库事务成功,但写入 Meilisearch 失败,数据就会出现偏差。

我们需要一个解耦的、可靠的方案。最终,我们将目光投向了变更数据捕获(Change Data Capture, CDC)。其核心思想是,让数据库成为唯一的事实来源,我们通过监听数据库的预写日志(WAL)来捕获所有 INSERT, UPDATE, DELETE 操作,然后将这些变更事件作为流数据进行处理。

整个架构蓝图逐渐清晰:

  1. 数据库 (PostgreSQL): 业务数据的主存储。
  2. Debezium: 作为 CDC 工具,伪装成一个从库,读取 PostgreSQL 的 WAL 日志,并将变更事件生成为结构化消息。
  3. Kafka: 作为消息代理,接收 Debezium 产生的变更事件,为下游消费提供高吞吐和解耦的缓冲层。
  4. Node.js 同步服务: 消费 Kafka 中的数据变更事件,执行核心处理逻辑:
    • 调用 LLM 服务为数据生成向量嵌入。
    • 将原始数据与向量嵌入合并。
    • 调用 Meilisearch API 更新或删除索引。
  5. ORM (Prisma): 主要用于一个辅助功能——全量数据同步。当系统初次上线或索引结构发生重大变更时,我们需要一个脚本来读取整个数据库,并将其灌入 Meilisearch。Prisma 的类型安全和简洁 API 在此场景下非常合适。
  6. Meilisearch: 提供最终的语义搜索服务。

下面是这个系统的架构图:

graph TD
    subgraph "Source Database"
        A[PostgreSQL]
    end

    subgraph "CDC & Messaging"
        B[Debezium Connector]
        C[Kafka Connect]
        D[Kafka Topics]
    end

    subgraph "Synchronization Service (Node.js)"
        E[Kafka Consumer]
        F{Data Transformer}
        G[LLM Embedding Service]
        H[Meilisearch Client]
        I[Prisma Client for Full Sync]
    end

    subgraph "Search Engine"
        J[Meilisearch Index]
    end

    A -- WAL --> B
    B -- Records --> C
    C -- Publishes --> D
    E -- Consumes --> D
    E --> F
    F --> G
    G --> F
    F --> H
    H -- Upserts/Deletes --> J
    I -- Reads All --> A
    I -- Pushes to --> H

环境搭建与配置

在真实项目中,这些组件会分布在不同的服务器或容器中。为了方便演示,我们使用 docker-compose 将整个环境一键启动。

docker-compose.yml:

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  postgres:
    image: debezium/postgres:14
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: products_db
    volumes:
      - ./db/init.sql:/docker-entrypoint-initdb.d/init.sql

  connect:
    image: debezium/connect:2.1
    container_name: connect
    ports:
      - "8083:8083"
    depends_on:
      - kafka
      - postgres
    environment:
      BOOTSTRAP_SERVERS: 'kafka:29092'
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses

  meilisearch:
    image: getmeili/meilisearch:v1.3
    container_name: meilisearch
    ports:
      - "7700:7700"
    environment:
      MEILI_MASTER_KEY: 'MASTER_KEY_SECRET'
      MEILI_ENV: 'development'
    volumes:
      - ./meili_data:/meili_data

  # 为了方便演示LLM embedding,这里使用Ollama
  ollama:
    image: ollama/ollama:latest
    container_name: ollama
    ports:
      - "11434:11434"
    volumes:
      - ./ollama_data:/root/.ollama
    # 注意:在生产中你可能需要GPU支持
    # deploy:
    #   resources:
    #     reservations:
    #       devices:
    #         - driver: nvidia
    #           count: 1
    #           capabilities: [gpu]

我们还需要为 PostgreSQL 准备一个初始化脚本 db/init.sql,它会创建我们的业务表并开启逻辑复制。

db/init.sql:

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price DECIMAL(10, 2) NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Debezium 需要 `REPLICA IDENTITY FULL` 来获取 UPDATE 操作前后的完整数据
ALTER TABLE products REPLICA IDENTITY FULL;

-- 插入一些初始数据
INSERT INTO products (name, description, price) VALUES
('Laptop Pro X', 'A powerful laptop for professionals.', 1499.99),
('Wireless Mouse', 'Ergonomic wireless mouse with long battery life.', 49.99),
('Mechanical Keyboard', 'RGB backlit mechanical keyboard with blue switches.', 129.50);

环境启动后,需要通过 API 向 Debezium 的 Kafka Connect 注册我们的 PostgreSQL 连接器。

register-debezium-connector.sh:

#!/bin/bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "products-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "password",
    "database.dbname": "products_db",
    "database.server.name": "pgserver1",
    "table.include.list": "public.products",
    "plugin.name": "pgoutput",
    "topic.prefix": "db.events"
  }
}'

执行此脚本后,Debezium 会开始监听 products 表的变更,并将事件发布到 Kafka 的 db.events.public.products 主题中。

Node.js 同步服务的实现

这是整个系统的核心。我们将使用 TypeScript 构建,并依赖 kafkajs, meilisearch, @prisma/client, 和 ollama 等库。

项目结构:

/sync-service
|-- /src
|   |-- config.ts
|   |-- kafka
|   |   |-- consumer.ts
|   |   |-- index.ts
|   |-- services
|   |   |-- embedding.service.ts
|   |   |-- meilisearch.service.ts
|   |   |-- processing.service.ts
|   |-- main.ts
|-- /prisma
|   |-- schema.prisma
|-- package.json
|-- tsconfig.json
|-- full-sync.ts

1. 配置与客户端初始化

src/config.ts:

// 在真实项目中,这些配置应该来自环境变量
export const config = {
  kafka: {
    brokers: ['localhost:9092'],
    clientId: 'meilisearch-sync-service',
    groupId: 'meilisearch-sync-group',
    topic: 'db.events.public.products',
  },
  meilisearch: {
    host: 'http://localhost:7700',
    apiKey: 'MASTER_KEY_SECRET',
    indexName: 'products',
  },
  ollama: {
    host: 'http://localhost:11434',
    model: 'nomic-embed-text', // 一个轻量级的文本嵌入模型
  },
  processing: {
    batchSize: 32, // 批量处理事件以提高LLM效率
    batchTimeoutMs: 1000, // 或等待1秒
  }
};

2. LLM 嵌入服务

直接为每个数据库变更调用一次 LLM 是非常低效且昂贵的。这里的关键是实现批量处理。我们将收集一定数量的事件或等待一定时间,然后将一批文本一次性发送给 LLM。

src/services/embedding.service.ts:

import { Ollama } from 'ollama';
import { config } from '../config';

class EmbeddingService {
  private ollama: Ollama;

  constructor() {
    this.ollama = new Ollama({ host: config.ollama.host });
    this.ensureModelExists();
  }

  private async ensureModelExists() {
    try {
        // 提前拉取模型,避免首次调用时过慢
        console.log(`Checking for Ollama model: ${config.ollama.model}...`);
        await this.ollama.pull({ model: config.ollama.model, stream: false });
        console.log('Ollama model is ready.');
    } catch (error) {
        console.error('Failed to pull Ollama model. Ensure Ollama service is running and the model exists.', error);
        process.exit(1);
    }
  }
  
  /**
   * 为一批文本生成向量嵌入
   * @param texts 要处理的文本数组
   * @returns 返回一个包含向量的数组,顺序与输入对应
   */
  public async generateEmbeddings(texts: string[]): Promise<number[][]> {
    if (!texts || texts.length === 0) {
      return [];
    }

    console.log(`Generating embeddings for a batch of ${texts.length} texts...`);
    
    // Ollama 的 JS 客户端目前似乎没有显式的 batch generate API,
    // 我们通过 Promise.all 并发执行。
    // 在生产中,如果LLM服务支持批量接口,务必使用它。
    try {
      const embeddings = await Promise.all(
        texts.map(text => this.ollama.embeddings({
            model: config.ollama.model,
            prompt: text,
        }))
      );
      return embeddings.map(e => e.embedding);
    } catch (error) {
        console.error('Error generating embeddings:', error);
        // 如果失败,返回一个长度匹配的空数组或抛出异常,让上层处理重试
        throw new Error('Failed to generate embeddings');
    }
  }
}

export const embeddingService = new EmbeddingService();

3. Meilisearch 服务

这个服务封装了与 Meilisearch 的交互,处理文档的增、删、改。

src/services/meilisearch.service.ts:

import { MeiliSearch } from 'meilisearch';
import { config } from '../config';

type ProductDocument = {
  id: number;
  name: string;
  description: string;
  price: number;
  _vectors: {
    default: number[];
  };
};

class MeiliSearchService {
  private client: MeiliSearch;
  private indexName: string = config.meilisearch.indexName;

  constructor() {
    this.client = new MeiliSearch({
      host: config.meilisearch.host,
      apiKey: config.meilisearch.apiKey,
    });
  }

  public async initializeIndex() {
    try {
      await this.client.getIndex(this.indexName);
    } catch (error) {
      console.log(`Index '${this.indexName}' not found. Creating it...`);
      await this.client.createIndex(this.indexName, { primaryKey: 'id' });
    }

    // 配置索引以支持向量搜索
    await this.client.index(this.indexName).updateSettings({
      embedders: {
        default: {
          source: 'userProvided',
          dimensions: 768, // nomic-embed-text 模型的维度
        },
      },
      filterableAttributes: ['price'],
      sortableAttributes: ['price', 'updated_at'], // 假设我们也会同步时间戳
    });
    console.log(`Meilisearch index '${this.indexName}' is configured.`);
  }

  public async upsertDocuments(documents: ProductDocument[]) {
    if (documents.length === 0) return;
    try {
      const task = await this.client.index(this.indexName).addDocuments(documents, {
        primaryKey: 'id',
      });
      console.log(`Upserted ${documents.length} documents. Task ID: ${task.taskUid}`);
    } catch (error) {
      console.error('Failed to upsert documents to Meilisearch:', error);
      // 这里的错误需要被上层捕获并处理,可能需要重试
      throw error;
    }
  }

  public async deleteDocuments(ids: number[]) {
    if (ids.length === 0) return;
    try {
      const task = await this.client.index(this.indexName).deleteDocuments(ids);
      console.log(`Deleted ${ids.length} documents. Task ID: ${task.taskUid}`);
    } catch (error) {
      console.error('Failed to delete documents from Meilisearch:', error);
      throw error;
    }
  }
}

export const meiliSearchService = new MeiliSearchService();

4. 核心处理逻辑

这是连接 Kafka 消费、LLM 增强和 Meilisearch 写入的桥梁。它负责解析 Debezium 消息,并实现批处理逻辑。

src/services/processing.service.ts:

import { embeddingService } from './embedding.service';
import { meiliSearchService } from './meilisearch.service';
import { config } from '../config';

// 定义Debezium消息的结构 (简化版)
interface DebeziumMessage {
  payload: {
    op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot)
    before: Record<string, any> | null;
    after: Record<string, any> | null;
  };
}

// 定义待处理的任务
type UpsertTask = {
    id: number;
    text: string;
    data: Record<string, any>;
};
type DeleteTask = number;

class ProcessingService {
  private upsertQueue: UpsertTask[] = [];
  private deleteQueue: DeleteTask[] = [];
  private timer: NodeJS.Timeout | null = null;

  public async processMessage(message: string) {
    try {
      const event: DebeziumMessage = JSON.parse(message);
      
      // 确保消息格式正确
      if (!event.payload) return;

      const { op, before, after } = event.payload;

      // c (create) 和 u (update) 都视为 upsert
      if ((op === 'c' || op === 'u') && after) {
        this.addToUpsertQueue(after);
      } 
      // d (delete)
      else if (op === 'd' && before) {
        this.addToDeleteQueue(before.id);
      }
      // r (read) 用于快照,也视为 upsert
      else if (op === 'r' && after) {
        this.addToUpsertQueue(after);
      }
      
      this.triggerBatchProcessing();

    } catch (error) {
      console.error('Error parsing or handling Kafka message:', error);
    }
  }

  private addToUpsertQueue(data: Record<string, any>) {
    // 这里的文本组合策略很重要,它直接影响语义搜索的质量
    const textToEmbed = `${data.name || ''}: ${data.description || ''}`;
    this.upsertQueue.push({ id: data.id, text: textToEmbed, data });
  }

  private addToDeleteQueue(id: number) {
    this.deleteQueue.push(id);
  }

  private triggerBatchProcessing() {
    // 如果队列达到批次大小,立即处理
    if (this.upsertQueue.length >= config.processing.batchSize || this.deleteQueue.length >= config.processing.batchSize) {
      if (this.timer) {
        clearTimeout(this.timer);
        this.timer = null;
      }
      this.executeBatch();
    } 
    // 否则,设置一个定时器,超时后处理
    else if (!this.timer && (this.upsertQueue.length > 0 || this.deleteQueue.length > 0)) {
      this.timer = setTimeout(() => {
        this.executeBatch();
        this.timer = null;
      }, config.processing.batchTimeoutMs);
    }
  }

  private async executeBatch() {
    // 复制并清空队列,防止处理期间新消息进入
    const upsertsToProcess = [...this.upsertQueue];
    const deletesToProcess = [...this.deleteQueue];
    this.upsertQueue = [];
    this.deleteQueue = [];

    if (deletesToProcess.length > 0) {
      try {
        await meiliSearchService.deleteDocuments(deletesToProcess);
      } catch (e) {
        console.error("Batch delete failed. Re-queueing might be needed in a production system.", e);
        // 生产级错误处理:将失败的ID推送到死信队列(DLQ)或重试队列
      }
    }
    
    if (upsertsToProcess.length > 0) {
      try {
        const texts = upsertsToProcess.map(task => task.text);
        const embeddings = await embeddingService.generateEmbeddings(texts);
        
        const documents = upsertsToProcess.map((task, index) => ({
          ...task.data,
          id: task.id,
          // Meilisearch 向量字段的约定格式
          _vectors: {
            default: embeddings[index],
          },
        }));

        await meiliSearchService.upsertDocuments(documents);

      } catch (e) {
        console.error("Batch upsert failed. This is a critical error.", e);
        // 生产级错误处理:将整个批次推送到DLQ
      }
    }
  }
}

export const processingService = new ProcessingService();

5. Kafka 消费者与主程序

src/kafka/consumer.ts:

import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { config } from '../config';
import { processingService } from '../services/processing.service';

export class KafkaConsumer {
  private kafka: Kafka;
  private consumer: Consumer;

  constructor() {
    this.kafka = new Kafka({
      clientId: config.kafka.clientId,
      brokers: config.kafka.brokers,
    });
    this.consumer = this.kafka.consumer({ groupId: config.kafka.groupId });
  }

  public async connect() {
    try {
      await this.consumer.connect();
      console.log('Kafka consumer connected.');
    } catch (error) {
      console.error('Failed to connect Kafka consumer:', error);
      process.exit(1);
    }
  }

  public async subscribeAndRun() {
    await this.consumer.subscribe({ topic: config.kafka.topic, fromBeginning: true });
    console.log(`Subscribed to topic: ${config.kafka.topic}`);
    
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
        if (!message.value) return;
        
        // 一个常见的错误是忘记处理心跳。
        // 如果 `eachMessage` 执行时间过长(超过 sessionTimeoutMs),
        // Kafka 会认为消费者已死并触发 rebalance。
        // kafkajs 在后台自动处理心跳,但长时间阻塞的同步代码仍会是问题。
        // 我们的批处理是异步的,所以风险较小。
        await processingService.processMessage(message.value.toString());
      },
    });
  }

  public async disconnect() {
      await this.consumer.disconnect();
  }
}

src/main.ts:

import { KafkaConsumer } from './kafka/consumer';
import { meiliSearchService } from './services/meilisearch.service';

async function bootstrap() {
  console.log('Starting synchronization service...');

  // 1. 初始化 Meilisearch 索引
  await meiliSearchService.initializeIndex();

  // 2. 创建并连接 Kafka 消费者
  const consumer = new KafkaConsumer();
  await consumer.connect();

  // 3. 订阅主题并开始消费
  await consumer.subscribeAndRun();

  // 优雅停机处理
  const errorTypes = ['unhandledRejection', 'uncaughtException'];
  errorTypes.forEach(type => {
    process.on(type, async (err) => {
      try {
        console.error(`Unhandled error: ${err}`);
        await consumer.disconnect();
        process.exit(1);
      } catch (_) {
        process.exit(1);
      }
    });
  });

  const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];
  signalTraps.forEach(type => {
    process.once(type, async () => {
      try {
        console.log('Shutdown signal received. Disconnecting...');
        await consumer.disconnect();
      } finally {
        process.kill(process.pid, type);
      }
    });
  });
}

bootstrap();

6. 全量同步脚本

这个脚本独立于实时服务,使用 Prisma 读取所有数据并推送到 Meilisearch。

prisma/schema.prisma:

generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL") // "postgresql://user:password@localhost:5432/products_db"
}

model products {
  id          Int      @id @default(autoincrement())
  name        String   @db.VarChar(255)
  description String?
  price       Decimal  @db.Decimal(10, 2)
  created_at  DateTime? @default(now()) @db.Timestamptz(6)
  updated_at  DateTime? @default(now()) @db.Timestamptz(6)
}

full-sync.ts:

import { PrismaClient } from '@prisma/client';
import { embeddingService } from './src/services/embedding.service';
import { meiliSearchService } from './src/services/meilisearch.service';

const prisma = new PrismaClient();
const BATCH_SIZE = 100; // 数据库读取批次

async function main() {
  console.log('Starting full data synchronization...');
  await meiliSearchService.initializeIndex();

  let cursor: number | undefined = undefined;
  let hasMore = true;

  while (hasMore) {
    const products = await prisma.products.findMany({
      take: BATCH_SIZE,
      skip: cursor ? 1 : 0,
      cursor: cursor ? { id: cursor } : undefined,
      orderBy: { id: 'asc' },
    });

    if (products.length === 0) {
      hasMore = false;
      continue;
    }

    cursor = products[products.length - 1].id;
    console.log(`Fetched ${products.length} products. Last ID: ${cursor}`);

    const texts = products.map(p => `${p.name}: ${p.description || ''}`);
    const embeddings = await embeddingService.generateEmbeddings(texts);
    
    const documents = products.map((product, index) => ({
      ...product,
      price: Number(product.price), // Prisma Decimal -> number
      _vectors: {
        default: embeddings[index],
      },
    }));

    await meiliSearchService.upsertDocuments(documents);
  }

  console.log('Full synchronization completed.');
}

main()
  .catch((e) => {
    console.error(e);
    process.exit(1);
  })
  .finally(async () => {
    await prisma.$disconnect();
  });

局限性与未来迭代路径

这套架构虽然功能强大且解耦,但在生产环境中仍有一些需要权衡和优化的点。

首先,LLM 嵌入生成是一个明显的性能瓶颈和成本中心。我们通过批处理缓解了这个问题,但这引入了至少几秒的延迟。对于延迟极度敏感的场景,可能需要探索更小、更快的本地嵌入模型,或者评估模型服务的 GPU 资源配置。

其次,错误处理机制可以进一步强化。目前我们只是打印日志。一个完整的生产系统需要一个死信队列(DLQ)策略。当消息处理(例如LLM调用或Meilisearch写入)失败多次后,应将其移入一个单独的Kafka主题,以便后续进行手动分析和重新处理,而不是无限期地阻塞主消费流程。

最后,系统的可观测性至关重要。需要引入度量指标来监控关键环节,例如:Kafka消费延迟(Consumer Lag)、每秒处理的消息数、LLM调用的平均延迟和错误率、Meilisearch写入的成功率等。使用 Prometheus 和 Grafana 可以很好地实现这一点,帮助我们及时发现并定位问题。


  目录