基于 gRPC-Go 与 Paxos 构建分布式 Emotion 原子化 CSS 编译服务


在大型微前端架构中,当数十个团队并行开发并部署组件时,确保构建产物的一致性成为一个棘手的挑战。我们遇到的一个具体问题源于 CSS-in-JS 库 EmotionEmotion 在构建时会为样式生成原子化的、独一无二的类名,但在一个分布式的 CI/CD 环境中,不同构建任务可能会为逻辑上完全相同的组件生成不同的类名,这取决于构建时的上下文。这会导致在集成环境中出现样式冲突和覆盖,调试过程极为痛苦。我们需要一个权威的、高可用的服务来统一管理这种“原子化”编译过程,保证“相同组件源码输入,永远得到相同原子化 CSS 输出”。

方案权衡:中心化数据库 vs. 分布式状态机

最初的构想是建立一个中心化的 gRPC 服务。所有 CI/CD 流水线在构建前端组件时,都调用这个服务的接口。服务接收组件源码,计算其哈希值,然后查询一个中心数据库(例如 PostgreSQL 或 Redis)。如果哈希值已存在,直接返回已编译好的 CSS 和类名;如果不存在,则执行编译,并将结果存入数据库。

方案A:中心化数据库

graph TD
    subgraph "CI/CD Pipeline"
        Agent1 -->|gRPC Call| CentralService
        Agent2 -->|gRPC Call| CentralService
        Agent3 -->|gRPC Call| CentralService
    end

    CentralService <--> D[PostgreSQL/Redis]

    style CentralService fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#bbf,stroke:#333,stroke-width:2px
  • 优点:
    • 实现简单,技术栈成熟。
    • 逻辑清晰,易于维护。
  • 缺点:
    • 单点故障 (SPOF): 中心服务或数据库一旦宕机,所有前端团队的构建流程都会被阻塞。
    • 性能瓶颈: 随着构建任务的增多,数据库将成为写入和查询的瓶颈。
    • 跨区域延迟: 如果构建节点分布在不同地理位置,访问单一中心数据库的延迟会显著拖慢构建速度。

在真实项目中,构建系统的稳定性和效率至关重要。任何阻塞都可能导致交付延迟。因此,单点故障是不可接受的。我们需要一个更高可用、容错的架构。

方案B:基于 Paxos 的分布式状态机

这个方案摒弃了外部中心数据库,将状态直接内嵌在服务节点自身。我们部署一个由多个对等节点组成的集群。每个节点都维护一份组件源码哈希到编译结果的映射表(即状态机)。当一个编译请求到达任意节点时,该节点会发起一次 Paxos 共识过程,让整个集群就“这个新组件的编译结果是什么”达成一致。一旦达成共iscensus,所有节点都会将这个新的映射关系应用到自己的状态机中。

graph TD
    subgraph "CI/CD Pipeline"
        Agent1 -->|gRPC Call| LoadBalancer
        Agent2 -->|gRPC Call| LoadBalancer
        Agent3 -->|gRPC Call| LoadBalancer
    end

    subgraph "Distributed Compiler Service Cluster"
        NodeA <-->|Paxos Protocol| NodeB
        NodeB <-->|Paxos Protocol| NodeC
        NodeC <-->|Paxos Protocol| NodeA
    end

    LoadBalancer --> NodeA
    LoadBalancer --> NodeB
    LoadBalancer --> NodeC

    subgraph NodeA
        direction LR
        gRPC_A[gRPC Server] -- writes to --> SM_A(State Machine)
        Paxos_A[Paxos Instance] -- updates --> SM_A
    end
    subgraph NodeB
        direction LR
        gRPC_B[gRPC Server] -- writes to --> SM_B(State Machine)
        Paxos_B[Paxos Instance] -- updates --> SM_B
    end
    subgraph NodeC
        direction LR
        gRPC_C[gRPC Server] -- writes to --> SM_C(State Machine)
        Paxos_C[Paxos Instance] -- updates --> SM_C
    end
    
    style NodeA fill:#ccf,stroke:#333,stroke-width:2px
    style NodeB fill:#ccf,stroke:#333,stroke-width:2px
    style NodeC fill:#ccf,stroke:#333,stroke-width:2px
  • 优点:
    • 高可用: 只要集群中超过半数的节点存活,服务就能正常工作。
    • 强一致性: Paxos 算法保证了所有节点的状态机最终是一致的。
    • 低延迟读取: 查询操作可以直接在本地节点内存中完成,速度极快。
  • 缺点:
    • 实现复杂: Paxos 算法以难以理解和正确实现而著称。
    • 写入性能: 每次写入(新的编译)都需要经过至少两次网络往返的共识过程,性能低于直接写数据库。

最终决策

考虑到构建基础设施对可用性的严苛要求,我们选择方案B。虽然实现复杂,但它从根本上解决了单点故障问题。对于我们的场景——编译缓存,写入频率远低于读取频率,Paxos 的写入延迟是完全可以接受的。一次性的实现投入,换来的是整个平台的长期稳定。

核心实现概览

我们将使用 Go 语言来实现这个服务。Go 的并发模型和强大的标准库非常适合构建这类网络密集型的分布式系统。

1. gRPC 服务定义

首先,定义 gRPC 接口。接口非常简单,只有一个 Compile 方法,接收组件源码,返回编译后的 CSS 和原子类名。

api/compiler/v1/compiler.proto

syntax = "proto3";

package compiler.v1;

option go_package = "github.com/your-org/atomic-compiler/api/compiler/v1;v1";

// CompilerService 提供了原子化的 CSS 编译能力
service CompilerService {
  // Compile 接收组件源码,返回经过共识确认的原子化 CSS
  rpc Compile(CompileRequest) returns (CompileResponse) {}
}

message CompileRequest {
  // 组件的原始源码,用于计算哈希和执行编译
  string component_source_code = 1;
}

message CompileResponse {
  // 编译后的 CSS 内容
  string compiled_css = 1;
  // 生成的原子化、唯一的类名
  string atomic_class_name = 2;
  // 指示此结果是来自缓存还是新编译的
  bool from_cache = 3;
}

2. Paxos 节点核心数据结构与逻辑

我们将实现一个简化的、针对单次决议 (Single-Decree Paxos) 的 Paxos 实例。在真实生产环境中,通常会使用 Multi-Paxos 或 Raft 来处理连续的日志条目,但为了清晰地展示核心思想,Single-Decree Paxos 已足够。

internal/paxos/node.go

package paxos

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"google.golang.org/grpc"
	// ... 其他导入
)

// Proposer 负责发起提议
type Proposer struct {
	mu           sync.Mutex
	nodeID       int
	proposalID   int
	majoritySize int
	peers        map[int]PeerClient // gRPC 客户端连接到其他节点
}

// Acceptor 负责响应提议
type Acceptor struct {
	mu            sync.Mutex
	promisedID    int // 已承诺的最高提案ID
	acceptedID    int // 已接受的提案ID
	acceptedValue *string // 已接受的值
}

// Node 代表一个 Paxos 实例,它既是 Proposer 也是 Acceptor
type Node struct {
	mu       sync.Mutex
	id       int
	proposer *Proposer
	acceptor *Acceptor
	// 这里简化,实际项目中应有 Learner 角色
	chosenValue  *string
	peers        map[int]string // peer id -> address
	grpcServer *grpc.Server
}

// Paxos 消息定义
// PrepareRequest: Proposer -> Acceptor (Phase 1a)
type PrepareRequest struct {
	ProposalID int
}

// PrepareResponse: Acceptor -> Proposer (Phase 1b, "Promise")
type PrepareResponse struct {
	Ok            bool
	PromisedID    int
	AcceptedID    int
	AcceptedValue *string
}

// AcceptRequest: Proposer -> Acceptor (Phase 2a)
type AcceptRequest struct {
	ProposalID int
	Value      string
}

// AcceptResponse: Acceptor -> Proposer (Phase 2b, "Accepted")
type AcceptResponse struct {
	Ok bool
}


// Propose 是 Proposer 的核心逻辑
// 它尝试让集群就 `value` 达成共识
func (p *Proposer) Propose(ctx context.Context, value string) (string, error) {
	p.mu.Lock()
	p.proposalID++
	proposalID := p.proposalID
	p.mu.Unlock()

	// --- Phase 1: Prepare ---
	log.Printf("[Node %d] Proposer: Starting Phase 1 (Prepare) for proposal %d", p.nodeID, proposalID)
	
	prepareResponses := p.broadcastPrepare(ctx, proposalID)
	
	promiseCount := 0
	highestAcceptedID := -1
	var latestAcceptedValue *string
	
	for _, resp := range prepareResponses {
		if resp != nil && resp.Ok {
			promiseCount++
			if resp.AcceptedID > highestAcceptedID {
				highestAcceptedID = resp.AcceptedID
				latestAcceptedValue = resp.AcceptedValue
			}
		}
	}

	if promiseCount < p.majoritySize {
		log.Printf("[Node %d] Proposer: Proposal %d failed. Not enough promises (%d/%d)", p.nodeID, proposalID, promiseCount, len(p.peers)+1)
		return "", fmt.Errorf("proposal rejected: could not get majority promise")
	}

	log.Printf("[Node %d] Proposer: Proposal %d received enough promises (%d)", p.nodeID, proposalID, promiseCount)

	// 如果在 Prepare 阶段发现了已经被接受的值,必须提议这个值
	proposalValue := value
	if latestAcceptedValue != nil {
		log.Printf("[Node %d] Proposer: Found an already accepted value '%s' from a previous proposal. Proposing it instead.", p.nodeID, *latestAcceptedValue)
		proposalValue = *latestAcceptedValue
	}

	// --- Phase 2: Accept ---
	log.Printf("[Node %d] Proposer: Starting Phase 2 (Accept) for proposal %d with value '%s'", p.nodeID, proposalID, proposalValue)
	
	acceptResponses := p.broadcastAccept(ctx, proposalID, proposalValue)

	acceptCount := 0
	for _, resp := range acceptResponses {
		if resp != nil && resp.Ok {
			acceptCount++
		}
	}

	if acceptCount < p.majoritySize {
		log.Printf("[Node %d] Proposer: Proposal %d failed. Not enough accepts (%d/%d)", p.nodeID, proposalID, acceptCount, len(p.peers)+1)
		return "", fmt.Errorf("proposal failed: could not get majority accept")
	}

	log.Printf("[Node %d] Proposer: Proposal %d with value '%s' has been CHOSEN!", p.nodeID, proposalID, proposalValue)
	
	// 在此通知 Learner 学习这个值
	// ... Learner logic here ...

	return proposalValue, nil
}

// HandlePrepare 是 Acceptor 的核心逻辑
func (a *Acceptor) HandlePrepare(req *PrepareRequest) *PrepareResponse {
	a.mu.Lock()
	defer a.mu.Unlock()

	if req.ProposalID > a.promisedID {
		log.Printf("Acceptor: Promising for proposal %d (previously promised %d)", req.ProposalID, a.promisedID)
		a.promisedID = req.ProposalID
		// 持久化 promisedID 以防崩溃
		// ... persistence logic ...
		return &PrepareResponse{
			Ok:            true,
			PromisedID:    a.promisedID,
			AcceptedID:    a.acceptedID,
			AcceptedValue: a.acceptedValue,
		}
	}

	log.Printf("Acceptor: Rejecting proposal %d (already promised %d)", req.ProposalID, a.promisedID)
	return &PrepareResponse{Ok: false, PromisedID: a.promisedID}
}

// HandleAccept 是 Acceptor 的核心逻辑
func (a *Acceptor) HandleAccept(req *AcceptRequest) *AcceptResponse {
	a.mu.Lock()
	defer a.mu.Unlock()

	if req.ProposalID >= a.promisedID {
		log.Printf("Acceptor: Accepting proposal %d with value '%s'", req.ProposalID, req.Value)
		a.promisedID = req.ProposalID
		a.acceptedID = req.ProposalID
		a.acceptedValue = &req.Value
		// 持久化状态以防崩溃
		// ... persistence logic ...
		return &AcceptResponse{Ok: true}
	}

	log.Printf("Acceptor: Rejecting accept request for proposal %d (promised %d)", req.ProposalID, a.promisedID)
	return &AcceptResponse{Ok: false}
}

// ... 广播 Prepare 和 Accept 请求的辅助函数 (broadcastPrepare, broadcastAccept) 需要实现 ...
// 这些函数会遍历 p.peers,并使用 gRPC 客户端并行地发送请求

代码注释中的关键点:

  • 角色分离: 尽管一个物理节点同时扮演多个角色,但在逻辑上将 ProposerAcceptor 的代码分开,能让逻辑更清晰。
  • 锁: Paxos 算法的正确性严重依赖于对内部状态的原子性操作,因此 sync.Mutex 的使用至关重要。
  • 持久化: 在注释中提到了持久化。在生产级实现中,promisedID, acceptedIDacceptedValue 必须在更新后立即写入稳定的存储(如磁盘文件),以防止节点崩溃后状态丢失,破坏算法的安全性。
  • 提案ID: 提案ID必须是单调递增且唯一的。一种常见的做法是结合时间戳和一个节点唯一的ID来生成。

3. 状态机与 gRPC 服务集成

现在,我们将 Paxos 实例与我们的业务逻辑——编译服务——结合起来。

internal/server/grpc.go

package server

import (
	"context"
	"crypto/sha256"
	"fmt"
	"log"
	"sync"

	v1 "github.com/your-org/atomic-compiler/api/compiler/v1"
	"github.com/your-org/atomic-compiler/internal/paxos"
)

// StateMachine 是我们的应用层状态
type StateMachine struct {
	mu     sync.RWMutex
	store  map[string]string // key: component hash, value: compiled css
}

func NewStateMachine() *StateMachine {
	return &StateMachine{
		store: make(map[string]string),
	}
}

func (sm *StateMachine) Get(key string) (string, bool) {
	sm.mu.RLock()
	defer sm.mu.RUnlock()
	val, ok := sm.store[key]
	return val, ok
}

func (sm *StateMachine) Apply(key, value string) {
	sm.mu.Lock()
	defer sm.mu.Unlock()
	// 这里的 Apply 必须是幂等的
	if _, ok := sm.store[key]; !ok {
		sm.store[key] = value
		log.Printf("State Machine: Applied new entry for key %s", key)
	}
}


// GrpcServer 实现了 gRPC 服务接口
type GrpcServer struct {
	v1.UnimplementedCompilerServiceServer
	
	node *paxos.Node // Paxos 节点实例
	sm   *StateMachine // 应用状态机
}

func NewGrpcServer(node *paxos.Node, sm *StateMachine) *GrpcServer {
	return &GrpcServer{node: node, sm: sm}
}


// Compile 是 gRPC 接口的核心实现
func (s *GrpcServer) Compile(ctx context.Context, req *v1.CompileRequest) (*v1.CompileResponse, error) {
	// 1. 计算源码哈希,作为状态机的 key
	hash := fmt.Sprintf("%x", sha256.Sum256([]byte(req.ComponentSourceCode)))

	// 2. 优先从本地状态机读取,这是高性能的关键
	if css, ok := s.sm.Get(hash); ok {
		log.Printf("Serving request for hash %s from local cache", hash)
		return &v1.CompileResponse{
			CompiledCss:       css,
			AtomicClassName:   extractClassName(css), // 辅助函数
			FromCache:         true,
		}, nil
	}

	// 3. 如果本地没有,说明可能需要进行一次新的编译和共识
	//    这里是业务逻辑与 Paxos 协议的结合点
	log.Printf("Cache miss for hash %s. Initiating compilation and consensus.", hash)

	// 模拟编译过程。在真实项目中,这里会调用 JS 运行时 (如 V8) 来执行 Emotion 的编译逻辑
	compiledCSS := s.executeEmotionCompilation(req.ComponentSourceCode, hash)

	// 4. 将编译结果作为提议的值,发起 Paxos 共识
	//    注意:这里的 value 是 key-value 对的序列化形式,以保证所有节点对“哪个key对应哪个value”达成一致
	proposalValue := fmt.Sprintf("%s:%s", hash, compiledCSS)
	
	// 在一个真实的 Multi-Paxos/Raft 系统中,这里会是 client.Propose(command)
	chosenValue, err := s.node.Propose(ctx, proposalValue) 
	if err != nil {
		log.Printf("ERROR: Paxos consensus failed for hash %s: %v", hash, err)
		return nil, fmt.Errorf("failed to reach consensus on compilation result: %w", err)
	}

	// 5. 将达成共识的值应用到状态机
	//    即使提议失败,Propose 方法也可能返回一个由其他节点提议并已达成共识的值。
	//    所以无论如何,都要解析 chosenValue 并应用。
	var chosenHash, chosenCSS string
	fmt.Sscanf(chosenValue, "%s:%s", &chosenHash, &chosenCSS)

	// 保证所有节点都应用相同的状态变更
	s.sm.Apply(chosenHash, chosenCSS)


	// 6. 返回最终达成共识的结果
	return &v1.CompileResponse{
		CompiledCss:       chosenCSS,
		AtomicClassName:   extractClassName(chosenCSS),
		FromCache:         false, 
	}, nil
}

func (s *GrpcServer) executeEmotionCompilation(source, hash string) string {
	// 这是一个桩函数。实际实现会非常复杂。
	// 它需要调用一个 JS 引擎,加载 Emotion 库,然后编译 `source`
	// 为了保证确定性,生成的类名应该与 hash 相关。
	// e.g., "css-a1b2c3d4" where "a1b2c3d4" is part of the hash.
	className := fmt.Sprintf("css-%s", hash[:8])
	return fmt.Sprintf(".%s { color: red; } /* compiled from source */", className)
}

func extractClassName(css string) string {
	// 同样是一个辅助桩函数
	if len(css) > 10 {
		return fmt.Sprintf("css-%s", "extracted")
	}
	return "unknown"
}

集成逻辑剖析:

  • 读写分离: 读取操作(sm.Get)是纯本地的,不涉及网络,速度快。写入操作则必须触发共识协议,是重量级的。
  • 幂等性: sm.Apply 必须是幂等的。因为共识消息可能会重传,状态机可能会多次收到应用同一个值的指令,必须保证多次应用和一次应用的效果相同。
  • 提议内容: 我们提议的不是单纯的 CSS,而是 hash:css 这样的字符串。这是为了防止出现混乱:如果节点A为hash1提议了css1,而节点B同时为hash2提议了css2,我们必须保证最终被选定的值包含了完整的上下文。
  • 失败与重试: Paxos 过程可能会因为网络分区、节点宕机或提案冲突而失败。在 Compile 方法中,如果 s.node.Propose 返回错误,客户端应该进行重试。一个健壮的实现会包含退避重试逻辑。

架构的扩展性与局限性

这个架构为我们的构建系统带来了前所未有的稳定性和一致性。它不仅解决了 Emotion 的原子化编译问题,其核心模式——分布式一致性状态机——还可以被复用。例如,我们可以用它来管理全局唯一的构建版本号,或者维护一个跨 CI/CD 流程的共享资源锁。

然而,当前实现也存在明显的局限性:

  1. 简化的 Paxos: 我们实现的是 Single-Decree Paxos,每次决议都是独立的。这对于一个完整的系统来说效率低下。生产环境应当使用 Multi-Paxos 或更易于理解和实现的 Raft 算法,它们将一系列操作组织成一个连续的、可复制的日志,大大提高了吞吐量。
  2. 集群成员管理: 当前实现假设集群成员是静态的,在配置文件中写死。一个真正的生产系统需要动态的成员管理机制,允许节点的加入和离开,这通常需要实现另一轮共识。
  3. 状态机快照与日志压缩: 随着时间推移,内存中的状态机会越来越大,Paxos 日志也会无限增长。必须实现快照机制,定期将内存状态持久化,并丢弃之前的日志,否则节点重启恢复会非常缓慢。
  4. 复杂性成本: 最大的局限性是复杂性本身。引入 Paxos 意味着团队需要具备深入理解和维护复杂分布式系统的能力。这是一笔不小的技术投资,只应在问题域的可用性和一致性要求足够高时才考虑。对于许多小型项目,一个带主备切换的中心化数据库或许是更务实的选择。

  目录