构建基于 API Gateway 的声明式 Saga 事务协调器


一、问题的定义:微服务架构中的一致性难题

在微服务架构中,一个单一的业务操作,例如“创建订单”,通常需要跨越多个独立的服务:订单服务创建订单记录、库存服务扣减库存、支付服务处理支付。这些服务各自拥有独立的数据库。当支付服务成功但库存服务因网络抖动而失败时,整个系统便进入了数据不一致的状态。

传统的解决方案是将事务协调逻辑放在客户端或者一个专门的业务编排层。客户端方案将复杂性泄露给了调用方,使其变得臃肿且难以维护。而专门的编排服务,虽然实现了关注点分离,却引入了额外的网络跳数和单点故障风险,整个系统的拓扑结构也变得更为复杂。

sequenceDiagram
    participant Client
    participant API Gateway
    participant Order Service
    participant Stock Service
    participant Payment Service

    Client->>API Gateway: POST /v1/orders
    API Gateway->>Order Service: CreateOrder()
    Order Service-->>API Gateway: Order Created (ID: 123)
    API Gateway->>Stock Service: DecreaseStock()
    Stock Service-->>API Gateway: Stock Decreased
    API Gateway->>Payment Service: ProcessPayment()
    Payment Service-->>API Gateway: Payment Failed
    
    %% At this point, Order and Stock are updated, but Payment failed. Data is inconsistent.
    %% How to roll back? API Gateway needs to know the compensation logic.

这里的核心矛盾是:业务流程的原子性要求与微服务独立部署、独立演化的原则之间的冲突。我们需要一种机制,既能保证跨服务操作的最终一致性,又不会对现有服务造成过度的侵入和耦合。

二、方案权衡:外部编排器 vs. 网关内置协调器

方案A:外部 Saga 编排服务

这是一个常见的模式。我们创建一个新的微服务,例如 OrderOrchestrator,专门负责处理创建订单的整个流程。

  • 优点:
    1. 关注点分离: 事务逻辑集中在编排器中,业务服务保持纯粹,只关注自身领域。
    2. 独立扩展: 编排服务可以根据事务处理的负载独立进行伸缩。
  • 缺点:
    1. 增加延迟: 请求路径变为 Client -> Gateway -> Orchestrator -> Business Services,多了一次网络往返。
    2. 架构复杂性: 引入了一个新的、有状态的关键组件,需要为其设计高可用、监控和持久化方案。
    3. 服务发现与治理: 编排器需要知道所有下游服务的地址和接口,增加了服务治理的复杂性。

方案B:API Gateway 内置声明式协调器插件

另一种思路是将事务协调能力下沉到基础设施层,即 API Gateway。网关作为所有请求的入口,天然是执行跨服务策略的理想位置。我们可以设计一个插件,它不包含硬编码的业务逻辑,而是通过一份“执行计划”(我们称之为“Saga Manifest”)来动态协调事务。

  • 优点:

    1. 简化架构: 无需引入新的服务,客户端视角看,整个复杂操作就是一个对网关的原子API调用。
    2. 降低延迟: 减少了内部服务间的网络跳数。
    3. 能力复用: 事务协调成为网关提供的一种通用能力,可以被任何需要跨服务调用的API复用,只需提供不同的 Saga Manifest。
    4. 声明式配置: 业务流程的定义从代码中剥离,变为可配置、可管理的资源。
  • 缺点:

    1. 网关职责加重: “智能网关”可能成为反模式。网关需要承担状态管理和更复杂的逻辑,对其性能和稳定性要求更高。
    2. 状态持久化: 为保证Saga流程在网关重启后仍可恢复,网关插件需要引入持久化存储(如Redis或数据库),使其变为有状态服务。

决策与理由

在真实项目中,许多跨服务的操作模式是固定的。将这种模式化的协调逻辑沉淀为平台能力,比为每个业务流程都创建一个专有编排器更具成本效益。因此,我们选择 方案B。我们的目标是构建一个高性能、可插拔的Saga协调器插件,它通过解析一份声明式的YAML配置来执行分布式事务,而不是在网关中硬编码任何业务逻辑。这既发挥了网关的中心化优势,又避免了其业务逻辑过度膨胀。

三、核心实现:基于 Go 构建可插拔的 Saga 协调器

我们将使用 Go 语言来构建这个 API Gateway 插件,因为它在网络编程和并发处理方面表现出色。

1. 定义 Saga Manifest

首先,定义一个声明式YAML文件格式,用于描述一个完整的Saga流程。这份文件应该包含事务的各个阶段(steps)、每个阶段的正向操作(action)和补偿操作(compensation)。

# file: create_order_saga.yaml
name: CreateOrderSaga
entrypoint: /api/v1/orders/create
steps:
  - name: CreateOrder
    description: "创建订单记录"
    action:
      method: POST
      # targetService 指向网关配置的上游服务名
      targetService: "order-service"
      path: "/orders"
      # 从请求体中提取参数
      body:
        orderId: "{uuid()}"
        userId: "{request.body.userId}"
        items: "{request.body.items}"
    compensation:
      method: PUT
      targetService: "order-service"
      path: "/orders/{steps.CreateOrder.response.body.orderId}/cancel"

  - name: DecreaseStock
    description: "扣减商品库存"
    dependencies: [CreateOrder]
    action:
      method: POST
      targetService: "stock-service"
      path: "/stock/decrease"
      body:
        transactionId: "{steps.CreateOrder.response.body.orderId}"
        items: "{request.body.items}"
    compensation:
      method: POST
      targetService: "stock-service"
      path: "/stock/increase"
      body:
        transactionId: "{steps.CreateOrder.response.body.orderId}"
        items: "{request.body.items}"

  - name: ProcessPayment
    description: "处理支付"
    dependencies: [DecreaseStock]
    action:
      method: POST
      targetService: "payment-service"
      path: "/payments"
      body:
        orderId: "{steps.CreateOrder.response.body.orderId}"
        amount: "{request.body.totalAmount}"
        userId: "{request.body.userId}"
    # 支付操作通常没有简单的补偿,或者补偿逻辑非常复杂(如退款流程),
    # 在此简化,实际中可能需要调用退款接口或进入人工处理流程。
    compensation:
      method: POST
      targetService: "payment-service"
      path: "/refunds"
      body:
        orderId: "{steps.CreateOrder.response.body.orderId}"

这份Manifest定义了一个名为 CreateOrderSaga 的流程,包含三个步骤。我们使用 {...} 语法来引用上下文变量,例如从初始请求体 request.body 或之前步骤的响应 steps.CreateOrder.response.body 中取值。

2. Saga 执行器核心代码 (Go)

Saga插件的核心是一个执行器(Executor),它负责解析 Manifest、管理事务状态并按顺序执行或回滚操作。

// pkg/saga/executor.go

package saga

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"

	// 假设我们有内部的HTTP客户端和状态存储
	"my-gateway/pkg/httpclient"
	"my-gateway/pkg/statestore"
)

// SagaContext holds the state of a running Saga instance.
// It's the context passed between steps.
type SagaContext struct {
	InitialRequest *http.Request
	StepResponses  map[string]interface{} // Stores responses from each step
	mu             sync.RWMutex
}

// Executor runs a Saga defined by a Manifest.
type Executor struct {
	manifest   *Manifest
	stateStore statestore.Store // Interface for Redis, etc.
	httpClient httpclient.Client
}

// NewExecutor creates a new Saga executor.
func NewExecutor(manifest *Manifest, store statestore.Store, client httpclient.Client) *Executor {
	return &Executor{
		manifest:   manifest,
		stateStore: store,
		httpClient: client,
	}
}

// TransactionState represents the persisted state of a saga transaction.
type TransactionState struct {
	ID              string
	Status          string // PENDING, EXECUTING, COMPENSATING, SUCCEEDED, FAILED
	CurrentStep     int
	CompletedSteps  []string
	CompensationLog []string
	Context         SagaContext
}

// Execute starts the Saga transaction.
func (e *Executor) Execute(ctx context.Context, initialRequest *http.Request) error {
	sagaCtx := &SagaContext{
		InitialRequest: initialRequest,
		StepResponses:  make(map[string]interface{}),
	}
    
    // 真正的生产级实现中, transactionId 应当更健壮
	transactionID := fmt.Sprintf("saga-%d", time.Now().UnixNano())
	log.Printf("[SAGA:%s] Starting transaction for manifest: %s", transactionID, e.manifest.Name)

	executedSteps := make([]*Step, 0, len(e.manifest.Steps))

	for i, step := range e.manifest.Steps {
		// 在生产环境中,每一步的状态都应持久化,以便在网关崩溃后恢复
		log.Printf("[SAGA:%s] Executing step: %s", transactionID, step.Name)
		
        // 1. 模板渲染: 用上下文替换 action 中的占位符
		renderedAction, err := e.renderTemplate(step.Action, sagaCtx)
		if err != nil {
			log.Printf("[SAGA:%s] Failed to render template for step %s: %v", transactionID, step.Name, err)
			e.compensate(ctx, transactionID, executedSteps, sagaCtx)
			return fmt.Errorf("template rendering failed for step %s: %w", step.Name, err)
		}
        
        // 2. 执行HTTP请求
		resp, err := e.httpClient.Do(ctx, renderedAction)
		if err != nil || resp.StatusCode >= 400 {
			log.Printf("[SAGA:%s] Step %s failed. Status: %d, Error: %v", transactionID, step.Name, resp.StatusCode, err)
			e.compensate(ctx, transactionID, executedSteps, sagaCtx)
			return fmt.Errorf("step %s execution failed: %w", step.Name, err)
		}

		// 3. 保存当前步骤的响应到上下文中
		sagaCtx.mu.Lock()
		sagaCtx.StepResponses[step.Name] = resp.Body // 假设响应体已解析为 map[string]interface{}
		sagaCtx.mu.Unlock()

		executedSteps = append(executedSteps, &step)
		log.Printf("[SAGA:%s] Step %s succeeded.", transactionID, step.Name)
	}

	log.Printf("[SAGA:%s] Transaction completed successfully.", transactionID)
	return nil
}

// compensate runs compensation actions for all executed steps in reverse order.
func (e *Executor) compensate(ctx context.Context, transactionID string, executedSteps []*Step, sagaCtx *SagaContext) {
	log.Printf("[SAGA:%s] Starting compensation...", transactionID)
	for i := len(executedSteps) - 1; i >= 0; i-- {
		step := executedSteps[i]
		if step.Compensation == nil {
			log.Printf("[SAGA:%s] No compensation defined for step %s. Skipping.", transactionID, step.Name)
			continue
		}

		log.Printf("[SAGA:%s] Compensating step: %s", transactionID, step.Name)
		
        renderedComp, err := e.renderTemplate(*step.Compensation, sagaCtx)
        if err != nil {
            // 这是一个严重问题,补偿模板渲染失败可能导致数据不一致
            // 需要告警和人工干预
            log.Printf("[SAGA:%s] CRITICAL: Failed to render compensation template for step %s: %v", transactionID, step.Name, err)
            continue
        }

		// 补偿操作的重试策略至关重要。一个常见的错误是认为补偿操作永远不会失败。
		// 在真实项目中,这里必须有强大的重试机制(例如指数退避)。
		maxRetries := 3
		for attempt := 1; attempt <= maxRetries; attempt++ {
			_, err := e.httpClient.Do(ctx, renderedComp)
			if err == nil {
				log.Printf("[SAGA:%s] Compensation for step %s succeeded.", transactionID, step.Name)
				break
			}
			log.Printf("[SAGA:%s] Compensation for step %s failed (attempt %d/%d): %v", transactionID, step.Name, attempt, maxRetries, err)
			if attempt == maxRetries {
				// 告警!人工介入!
				log.Printf("[SAGA:%s] CRITICAL: Compensation for step %s failed after all retries.", transactionID, step.Name)
			}
			time.Sleep(time.Duration(attempt) * time.Second) // Exponential backoff
		}
	}
}

// renderTemplate is a placeholder for a template rendering engine
// that replaces placeholders like {request.body.userId} with actual values.
func (e *Executor) renderTemplate(action Action, sagaCtx *SagaContext) (httpclient.Request, error) {
	// 实际实现会使用 Go 的 template 包或更复杂的表达式引擎
	// 这里为了演示,我们只做一个简单的模拟
	// ... 实现细节省略 ...
	log.Println("Rendering template for action:", action.Path)
	return httpclient.Request{
        Method: action.Method,
        URL: fmt.Sprintf("http://%s%s", action.TargetService, action.Path), // URL 拼接也需要更健壮
        Body: action.Body, // Body中的模板也需要渲染
    }, nil
}

这段代码勾勒出了执行器的核心逻辑:

  1. 顺序执行: 遍历 Manifest 中的 steps 并执行 action
  2. 上下文传递: 每一步的执行结果都存入 SagaContext,供后续步骤使用。
  3. 失败补偿: 任何一步失败,立即调用 compensate 函数,该函数会逆序执行所有已成功步骤的 compensation 操作。
  4. 幂等性: 一个关键的、代码中未完全体现的细节是,actioncompensation 的实现方(即业务服务)必须保证接口的幂等性。例如,重复调用扣减库存的补偿接口(增加库存),结果应该和调用一次相同。这通常通过传递唯一的 transactionId 来实现。

3. 网关管理界面与 UnoCSS

为了让运维人员能够方便地管理和监控这些 Saga Manifests,我们需要一个管理后台。这个后台本身也是一个Web应用,其前端界面的开发效率至关重要。

这里就是 UnoCSS 发挥作用的地方。UnoCSS 是一个原子化的 CSS 引擎,它允许我们通过 class name 直接在 HTML 中构建样式,极大地提升了开发效率,特别适合快速迭代内部工具和仪表盘。

假设我们用 Vue.js 来构建这个管理界面,我们可以创建一个组件来展示和编辑 Saga Manifest。

<!-- components/SagaEditor.vue -->
<template>
  <div class="p-4 bg-gray-800 text-white rounded-lg shadow-xl font-mono">
    <h2 class="text-xl font-bold mb-4 border-b border-gray-600 pb-2">
      Saga Manifest Editor: {{ manifest.name }}
    </h2>
    <div v-for="(step, index) in manifest.steps" :key="index" class="mb-6 p-4 bg-gray-700 rounded">
      <div class="flex justify-between items-center">
        <h3 class="text-lg font-semibold text-cyan-400">Step {{ index + 1 }}: {{ step.name }}</h3>
        <button class="i-carbon-trash-can text-red-500 hover:text-red-400 text-xl"></button>
      </div>
      
      <!-- Action Section -->
      <div class="mt-4 p-3 bg-gray-900/50 rounded">
        <p class="font-bold text-green-400">Action</p>
        <div class="mt-2 flex space-x-2">
          <span class="px-2 py-1 bg-green-800 text-green-200 rounded text-sm font-bold">{{ step.action.method }}</span>
          <span class="px-2 py-1 bg-gray-600 text-gray-200 rounded text-sm">{{ step.action.targetService }}</span>
        </div>
        <p class="mt-2 text-sm text-gray-300">{{ step.action.path }}</p>
      </div>

      <!-- Compensation Section -->
      <div v-if="step.compensation" class="mt-3 p-3 bg-yellow-900/50 rounded">
        <p class="font-bold text-yellow-400">Compensation</p>
        <div class="mt-2 flex space-x-2">
          <span class="px-2 py-1 bg-yellow-800 text-yellow-200 rounded text-sm font-bold">{{ step.compensation.method }}</span>
          <span class="px-2 py-1 bg-gray-600 text-gray-200 rounded text-sm">{{ step.compensation.targetService }}</span>
        </div>
        <p class="mt-2 text-sm text-gray-300">{{ step.compensation.path }}</p>
      </div>
    </div>
    
    <button class="w-full mt-4 py-2 px-4 bg-indigo-600 hover:bg-indigo-500 rounded font-bold transition-colors">
      Save Changes
    </button>
  </div>
</template>

<script setup>
// Props for manifest data
defineProps({
  manifest: Object,
});
</script>

这段代码中,p-4, bg-gray-800, text-white, rounded-lg, flex, justify-between 等都是 UnoCSS 的原子化 class。我们没有写一行传统的 CSS 代码,就快速构建出了一个结构清晰、样式丰富的界面。这种开发模式使得我们可以将更多精力投入到与 API Gateway Admin API 的交互逻辑上,而不是CSS的调试中。当需要增加一个新的监控状态或配置项时,只需在模板中添加几行带有原子化类的 HTML 即可。

四、架构的扩展性与局限性

这个基于 API Gateway 的声明式 Saga 协调器方案,本质上是在基础设施层面提供了一种“事务即服务”(Transaction-as-a-Service)的能力。

扩展性:

  1. 支持更多模式: 当前实现的是 Saga 的编排(Orchestration)模式。未来插件可以扩展,支持如 TCC(Try-Confirm-Cancel)等其他分布式事务模式,只需在 Manifest 中增加相应的字段和执行逻辑。
  2. 异步与长事务: 对于耗时较长的事务,可以引入消息队列。网关插件在接收到请求后,将Saga任务投递到队列中,并立即返回一个事务ID。客户端可以通过该ID轮询事务状态。这需要执行器与持久化存储更紧密的集成。
  3. 可视化与监控: 结合管理后台,可以对进行中的Saga事务进行实时可视化追踪,展示每一步的状态、耗时和上下文数据,极大地提升了问题排查的效率。

局限性:

  1. 网关的性能开销: 在请求的关键路径上增加如此复杂的解析、执行和补偿逻辑,必然会对网关的性能和延迟产生影响。必须对Saga插件进行严格的性能测试和优化,甚至考虑将其运行在独立的线程池中,与核心的路由转发逻辑隔离。
  2. 不适用于所有场景: 此方案非常适合有明确步骤和补偿逻辑的、由API调用驱动的业务流程。但对于事件驱动、拓扑结构复杂的“舞蹈式”(Choreography)Saga,此方案则不适用。舞蹈式Saga更适合通过消息总线实现,服务间通过订阅/发布事件来协作。
  3. 状态存储的依赖: 该方案将网关从一个无状态组件变成了有状态组件,对外部的状态存储(如Redis, etcd)产生了强依赖。该存储的可用性和性能直接决定了整个事务协调服务的可靠性。这在架构设计和运维上引入了新的挑战。

  目录