ScyllaDB与Trino联邦查询构建实时特征分析平台的架构决策


我们面临一个具体的工程挑战:一个高并发的AI推荐服务,每秒产生数十万次用户行为事件。这些事件必须以极低的延迟写入数据库,用于实时用户画像的更新与读取,这是典型的OLTP场景。同时,数据科学家和运营团队需要一个交互式仪表盘,对这些最近几小时甚至几天内的原始事件数据进行探索性分析(ad-hoc),以验证模型假设、诊断异常或发现新特征。

业务对这个分析仪表盘的核心要求是:数据新鲜度必须在分钟级别,复杂聚合查询的P95延迟必须在10秒以内。传统的解决方案无法满足这种对新鲜度和交互性的双重苛求。

定义问题边界与约束

首先,我们量化一下技术指标:

  • 写入吞吐: 峰值可达 200,000 writes/sec。
  • 单点读取延迟: P99 < 10ms (用于线上服务)。
  • 数据总量: 每日增量约 5TB 原始事件。
  • 分析查询模式: Ad-hoc,包含GROUP BY, JOIN, COUNT(DISTINCT), 窗口函数等。
  • 分析查询延迟: P95 < 10s。
  • 数据新鲜度: 分析仪表盘看到的数据延迟不超过5分钟。
  • 架构约束: 尽可能减少数据冗余和ETL管道的复杂性,控制运维成本。

ScyllaDB因其与Cassandra兼容的API和极致的I/O性能,已被选定为线上服务的核心OLTP数据库,它完全能满足写入和单点读取的需求。问题的核心在于如何优雅地嫁接上高性能的OLAP能力。

方案A:经典的离线数仓架构

这是最容易想到的方案。架构如下:

graph TD
    subgraph OLTP
        Service[推荐服务] --> ScyllaDB
    end
    
    subgraph ETL Pipeline
        ScyllaDB -- 定时导出 (Spark Batch) --> DataLake[数据湖 S3/HDFS]
        DataLake -- T+1 加载 --> DWH[数据仓库 ClickHouse/Snowflake]
    end
    
    subgraph OLAP
        Dashboard[分析仪表盘] --> DWH
    end
  • 实现路径: 每天凌晨通过Spark作业,将ScyllaDB的数据快照导出为Parquet格式存入S3。然后,另一个作业将这些Parquet文件加载到ClickHouse集群中。SwiftUI构建的仪表盘直接查询ClickHouse。
  • 优势:
    1. 物理隔离: OLTP和OLAP负载完全分离,分析查询的性能风暴不会影响线上服务。
    2. 成熟稳定: 这是业界最成熟的方案,生态工具链非常完善。
    3. 查询性能: ClickHouse这类列式存储数据库对聚合查询的性能优化是极致的。
  • 劣势:
    1. 数据延迟: 整个流程是T+1的,数据新鲜度是天级别,完全不满足“分钟级”的核心需求。
    2. 存储成本: 数据在ScyllaDB、S3、ClickHouse中存储了三份,成本高昂。
    3. 运维复杂度: 维护一个Spark集群和ClickHouse集群,以及调度ETL作业,带来了显著的运维负担。

在我们的场景下,数据延迟是不可逾越的障碍。此方案被直接否决。

方案B:流式计算实时聚合架构

为了解决数据延迟问题,自然会想到流式处理。

graph TD
    subgraph Ingestion
        Service[推荐服务] --> Kafka
    end
    
    subgraph Data Flow
        Kafka -- Stream 1 --> ScyllaDB[(ScyllaDB)]
        Kafka -- Stream 2 --> Flink[Flink实时聚合]
    end
    
    subgraph Storage & Serving
        Flink --> OLAP_DB[实时OLAP库 Druid/ClickHouse]
        Dashboard[分析仪表盘] --> OLAP_DB
    end
  • 实现路径: 服务将事件写入Kafka。一个消费者将原始数据实时写入ScyllaDB,保障OLTP需求。另一个Flink作业消费同样的数据流,进行预定义的聚合(例如计算每分钟的点击量、独立用户数),并将结果写入专门的OLAP数据库,如Druid或ClickHouse。
  • 优势:
    1. 极低延迟: 数据新鲜度可以达到秒级。
    2. 高查询性能: 由于数据是预聚合的,仪表盘上的固定指标查询会非常快。
  • 劣势:
    1. 灵活性差: 这是该方案的致命伤。它只能回答“预先定义好的问题”。如果数据科学家想探索一个新的、未被预聚合的维度组合,就必须修改Flink代码,重新部署作业,然后等待新数据生成。这完全违背了“探索性分析”的初衷。
    2. 运维复杂度更高: 在方案A的基础上,又引入了Kafka和Flink两个复杂的分布式系统。
    3. 原始数据查询问题: 如果要查询原始明细数据,依然需要回源到ScyllaDB,或者将明细数据也灌入Druid,但这会带来巨大的存储和索引成本。

这个方案虽然解决了延迟问题,却牺牲了查询的灵活性。对于一个探索性分析平台,这是不可接受的。此方案也被否决。

最终选择:基于Trino的联邦查询架构

我们需要的是一种既能利用ScyllaDB的实时写入能力,又能提供足够灵活的即席查询,同时避免复杂数据同步链路的方案。Trino(原PrestoSQL)进入了我们的视野。

Trino是一个分布式SQL查询引擎,它的核心思想是“连接一切数据源”。它可以将ScyllaDB/Cassandra伪装成一个关系型数据库表,然后用SQL进行查询。

graph TD
    subgraph Data Tier
        Service[推荐服务] -- 200k writes/sec --> ScyllaDB[(ScyllaDB Cluster)]
    end
    
    subgraph Query Tier
        TrinoCoordinator[Trino Coordinator]
        TrinoWorker1[Trino Worker]
        TrinoWorker2[Trino Worker]
        TrinoWorker3[Trino Worker]
        
        TrinoCoordinator --> TrinoWorker1
        TrinoCoordinator --> TrinoWorker2
        TrinoCoordinator --> TrinoWorker3
        
        TrinoWorker1 -- Cassandra Connector --> ScyllaDB
        TrinoWorker2 -- Cassandra Connector --> ScyllaDB
        TrinoWorker3 -- Cassandra Connector --> ScyllaDB
    end

    subgraph Application Tier
        SwiftUI_Dashboard[SwiftUI Dashboard on macOS] --> API_Gateway[Go API Gateway]
        API_Gateway -- SQL Query --> TrinoCoordinator
    end
  • 实现路径: ScyllaDB作为唯一的数据真理源。部署一个独立的Trino集群,通过其Cassandra Connector直接连接ScyllaDB。SwiftUI仪表盘通过一个轻量级的API网关向Trino提交查询请求。
  • 选择理由:
    1. 零数据冗余: 无需ETL,Trino直接查询ScyllaDB中的实时数据,数据新鲜度取决于ScyllaDB的写入延迟,即秒级。
    2. 极高的灵活性: 提供了完整的SQL能力,数据科学家可以编写任意复杂的查询,在原始数据上进行探索。
    3. 架构简化: 相比方案A和B,技术栈大大简化,只需要维护ScyllaDB和Trino两个核心组件。
    4. 水平扩展: Trino和ScyllaDB都是可水平扩展的架构,可以独立增减节点以应对负载变化。

当然,这个方案并非银弹。这里的坑在于,将OLAP负载直接施加在OLTP数据库上,必须进行精细的设计和调优,否则分析查询可能会拖垮线上服务。这正是我们工作的核心。

核心实现与调优细节

1. ScyllaDB Schema 设计

ScyllaDB的表结构设计是整个架构成功的基石。它必须同时兼顾高并发写入、快速单点查询以及可接受的全表扫描性能。

-- 用户行为事件表
CREATE TABLE product.user_events (
    -- 分区键: 使用 event_date 和 a_bucket (artificial bucket) 组合
    -- event_date: 保证数据按天分区,便于管理和历史数据归档
    -- a_bucket: 人工分桶,将一天内的数据分散到 N 个分区中,避免单分区过大。
    --           例如,可以基于 user_id % 100 来计算,将负载均匀分散。
    event_date date,
    a_bucket int,

    -- 集群键: 决定了分区内的数据排序,对范围查询至关重要
    event_timestamp timestamp,
    user_id uuid,
    event_id timeuuid,

    -- 其他事件属性
    event_type text,
    product_id text,
    session_id uuid,
    properties map<text, text>,

    PRIMARY KEY ((event_date, a_bucket), event_timestamp, user_id, event_id)
) WITH CLUSTERING ORDER BY (event_timestamp DESC);

设计考量:

  • 分区键 (event_date, a_bucket): 这是Trino性能的关键。Trino在查询ScyllaDB时,如果WHERE子句中包含了完整的分区键,它能够实现高效的“分区剪枝”,只读取相关的分区。我们的仪表盘查询通常会带上时间范围,event_date可以很好地利用这一点。a_bucket则纯粹是为了打散热点,防止单个分区在写入或读取时成为瓶颈。在真实项目中,a_bucket的数量需要根据每日数据量仔细评估。
  • 集群键 event_timestamp DESC: 这保证了每个分区内的数据按时间倒序排列。这对于获取“最近N条事件”这类查询非常高效。
  • 避免过度使用二级索引: ScyllaDB的二级索引在高性能场景下是反模式。我们选择通过设计更宽的分区键或物化视图来满足不同查询需求,而不是依赖二级索引。

2. Trino 连接器配置

Trino与ScyllaDB的连接配置直接影响查询效率和稳定性。以下是etc/catalog/scylla.properties文件的关键配置。

# etc/catalog/scylla.properties

connector.name=cassandra
cassandra.contact-points=scylla-node1.example.com,scylla-node2.example.com
cassandra.port=9042
cassandra.native-protocol-port=9042
cassandra.keyspace=product

# 核心调优参数

# 1. 一致性级别: 分析查询对一致性要求不高,使用 LOCAL_QUORUM 避免跨机房延迟,并减轻集群压力。
#    严禁使用 ANY 或 ALL。
cassandra.consistency-level=LOCAL_QUORUM

# 2. 分片大小: Trino 将 CQL 表的 token range 划分为多个 split 交给 worker 处理。
#    这个值需要根据数据分布和ScyllaDB节点性能进行调优。默认值可能过大。
#    我们从 128MB 开始测试。
cassandra.split-size=128MB

# 3. 分片策略: 对于 Vnode 环,size_based 更均匀。
cassandra.partition-splitter=size_based

# 4. 每次从ScyllaDB获取的数据行数。需要与JVM内存相匹配,避免OOM。
cassandra.fetch-size=2000

# 5. 负载均衡策略: TokenAwarePolicy是必须的,它能将请求路由到持有该数据的主节点。
#    DCAwareRoundRobinPolicy 保证请求只在本地数据中心循环,对多数据中心部署至关重要。
cassandra.load-policy=DCAwareRoundRobinPolicy:local_dc=dc1,used_hosts_per_remote_dc=0

# 6. 超时设置: 分析查询可能耗时较长,需要适当调高超时时间,防止Trino过早放弃。
cassandra.client.read-timeout=5m
cassandra.client.connect-timeout=30s

3. 中间层 Go API 网关

直接将Trino暴露给客户端是不可接受的,原因有三:安全、稳定性和可管理性。因此,我们构建了一个轻量级的Go API网关。

// main.go
package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"time"
)

var (
	trinoUser        = "data-scientist"
	trinoCoordinator = "http://trino-coordinator.example.com:8080"
	trinoCatalog     = "scylla"
	trinoSchema      = "product"
)

// QueryRequest 定义了客户端传入的请求结构
type QueryRequest struct {
	Query string `json:"query"`
}

func queryHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
		return
	}
    
    // 1. 解析和校验请求
	var reqBody QueryRequest
	if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}

    // 2. [安全] 在生产环境中,此处必须有SQL注入防护、查询复杂度分析和权限校验
    //    例如,可以解析AST,禁止某些危险操作,或者限制扫描的数据量。
    //    这里为了演示,仅做简单记录。
	log.Printf("Received query: %s", reqBody.Query)

    // 3. 构建对Trino的请求
	trinoReq, err := http.NewRequest("POST", trinoCoordinator+"/v1/statement", bytes.NewBufferString(reqBody.Query))
	if err != nil {
		http.Error(w, "Failed to create Trino request", http.StatusInternalServerError)
		return
	}

	trinoReq.Header.Set("Content-Type", "text/plain")
	trinoReq.Header.Set("X-Trino-User", trinoUser)
	trinoReq.Header.Set("X-Trino-Catalog", trinoCatalog)
	trinoReq.Header.Set("X-Trino-Schema", trinoSchema)
    trinoReq.Header.Set("X-Trino-Source", "SwiftUI-Dashboard")


    // 4. 执行查询并处理分页结果
	client := &http.Client{Timeout: 5 * time.Minute}
	resp, err := client.Do(trinoReq)
	if err != nil {
		http.Error(w, fmt.Sprintf("Failed to execute query on Trino: %v", err), http.StatusServiceUnavailable)
		return
	}
	defer resp.Body.Close()

    // Trino的API是异步的,它会返回一个nextUri用于获取结果
	w.Header().Set("Content-Type", "application/json")
	if err := processTrinoResponse(client, resp, w); err != nil {
        // 错误处理非常重要,需要将Trino的错误信息清晰地返回给客户端
        http.Error(w, fmt.Sprintf("Error processing Trino response: %v", err), http.StatusInternalServerError)
    }
}

// processTrinoResponse 负责处理Trino的分页响应
func processTrinoResponse(client *http.Client, initialResp *http.Response, w http.ResponseWriter) error {
	var bodyJSON map[string]interface{}
	if err := json.NewDecoder(initialResp.Body).Decode(&bodyJSON); err != nil {
		return fmt.Errorf("failed to decode initial response: %w", err)
	}

	encoder := json.NewEncoder(w)
    // 开启流式JSON编码
    w.Write([]byte(`{"results": [`))
    firstChunk := true

	nextURI, ok := bodyJSON["nextUri"].(string)
	for ok && nextURI != "" {
		nextReq, _ := http.NewRequest("GET", nextURI, nil)
		nextResp, err := client.Do(nextReq)
		if err != nil {
			return fmt.Errorf("failed to fetch next chunk: %w", err)
		}
		
		var chunkJSON map[string]interface{}
		bodyBytes, _ := io.ReadAll(nextResp.Body)
		nextResp.Body.Close()

		if err := json.Unmarshal(bodyBytes, &chunkJSON); err != nil {
			return fmt.Errorf("failed to decode chunk JSON: %w", err)
		}

        // 将数据部分流式写入响应
        if data, hasData := chunkJSON["data"].([]interface{}); hasData {
            for _, row := range data {
                if !firstChunk {
                    w.Write([]byte(`,`))
                }
                if err := encoder.Encode(row); err != nil {
                    return fmt.Errorf("failed to encode row: %w", err)
                }
                firstChunk = false
            }
        }
		
		nextURI, ok = chunkJSON["nextUri"].(string)
	}
    w.Write([]byte(`]}`))
	return nil
}

func main() {
	http.HandleFunc("/query", queryHandler)
	log.Println("API Gateway listening on :8888")
	if err := http.ListenAndServe(":8888", nil); err != nil {
		log.Fatal(err)
	}
}

4. SwiftUI 客户端实现

SwiftUI客户端需要处理好异步数据加载、状态管理和动态数据显示。

import SwiftUI

// 用于表示从API返回的一行数据,这里简化为字典
typealias QueryRow = [String: AnyDecodable]

// AnyDecodable 帮助我们解码异构的JSON
struct AnyDecodable: Decodable {
    let value: Any
    init(from decoder: Decoder) throws {
        let container = try decoder.singleValueContainer()
        if let intVal = try? container.decode(Int.self) { value = intVal }
        else if let doubleVal = try? container.decode(Double.self) { value = doubleVal }
        else if let stringVal = try? container.decode(String.self) { value = stringVal }
        else if let boolVal = try? container.decode(Bool.self) { value = boolVal }
        else { value = NSNull() }
    }
}

// API响应的顶层结构
struct QueryResponse: Decodable {
    let results: [QueryRow]
}

// ViewModel负责管理状态和网络请求
@MainActor
class DashboardViewModel: ObservableObject {
    enum ViewState {
        case idle
        case loading
        case loaded([QueryRow])
        case error(String)
    }

    @Published var state: ViewState = .idle
    @Published var query: String = """
    SELECT event_type, COUNT(DISTINCT user_id) AS distinct_users
    FROM user_events
    WHERE event_date = CURRENT_DATE - INTERVAL '1' DAY
    GROUP BY 1
    ORDER BY 2 DESC
    LIMIT 100;
    """
    
    private var apiGatewayURL = URL(string: "http://localhost:8888/query")!

    func executeQuery() {
        guard !query.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty else {
            state = .error("Query cannot be empty.")
            return
        }
        
        state = .loading
        
        Task {
            do {
                // 1. 准备请求
                var request = URLRequest(url: apiGatewayURL)
                request.httpMethod = "POST"
                request.setValue("application/json", forHTTPHeaderField: "Content-Type")
                let requestBody = ["query": query]
                request.httpBody = try JSONEncoder().encode(requestBody)

                // 2. 发起网络请求
                // 在生产级应用中,这里的URLSession应该被更完善地配置,例如设置超时
                let (data, response) = try await URLSession.shared.data(for: request)

                guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 200 else {
                    let errorBody = String(data: data, encoding: .utf8) ?? "Unknown server error"
                    throw URLError(.badServerResponse, userInfo: [NSLocalizedDescriptionKey: errorBody])
                }

                // 3. 解码JSON
                let decodedResponse = try JSONDecoder().decode(QueryResponse.self, from: data)
                state = .loaded(decodedResponse.results)

            } catch {
                // 4. 细致的错误处理
                logError(error)
                state = .error("Failed to execute query: \(error.localizedDescription)")
            }
        }
    }

    private func logError(_ error: Error) {
        // 在真实项目中,这里会连接到日志系统
        print("An error occurred: \(error)")
    }
}

// SwiftUI视图
struct DashboardView: View {
    @StateObject private var viewModel = DashboardViewModel()

    var body: some View {
        VStack(alignment: .leading, spacing: 10) {
            Text("Trino Query Editor")
                .font(.title)

            TextEditor(text: $viewModel.query)
                .font(.system(.body, design: .monospaced))
                .frame(height: 200)
                .border(Color.gray, width: 0.5)

            HStack {
                Button("Execute Query", action: viewModel.executeQuery)
                    .disabled(viewModel.state.isLoading)
                
                if viewModel.state.isLoading {
                    ProgressView()
                }
            }

            // 结果展示区域
            switch viewModel.state {
            case .idle:
                Text("Enter a query and click 'Execute'.")
                    .foregroundColor(.secondary)
            case .loading:
                Text("Querying Trino...")
                    .foregroundColor(.secondary)
            case .loaded(let rows):
                // 对于macOS和iPadOS,Table是展示表格数据的理想选择
                if #available(macOS 12.0, iOS 16.0, *) {
                    ResultTableView(rows: rows)
                } else {
                    // Fallback for older systems
                    Text("Results loaded, but Table view is unavailable on this OS version.")
                }
            case .error(let errorMessage):
                Text(errorMessage)
                    .foregroundColor(.red)
            }
            
            Spacer()
        }
        .padding()
    }
}

// 扩展来检查加载状态
extension DashboardViewModel.ViewState {
    var isLoading: Bool {
        if case .loading = self { return true }
        return false
    }
}

// 使用SwiftUI的Table来展示结果
@available(macOS 12.0, iOS 16.0, *)
struct ResultTableView: View {
    let rows: [QueryRow]
    let columns: [String]

    init(rows: [QueryRow]) {
        self.rows = rows
        self.columns = rows.first?.keys.sorted() ?? []
    }

    var body: some View {
        // Table需要唯一的行标识符
        Table(rows, selection: .constant(Set<QueryRow.ID>())) {
            ForEach(columns, id: \.self) { columnKey in
                // 构建动态列
                TableColumn(columnKey) { row in
                    Text(displayValue(for: row[columnKey]?.value))
                }
            }
        }
    }
    
    private func displayValue(for value: Any?) -> String {
        guard let value = value, !(value is NSNull) else { return "NULL" }
        return String(describing: value)
    }
}

// 让字典可以被Identifiable
extension Dictionary: Identifiable where Key == String, Value == AnyDecodable {
    public var id: String {
        // 为表格行生成一个稳定的ID,这里用所有值的字符串拼接,不是最高效但可行
        return self.map { "\($0.key):\($0.value.value)" }.joined(separator: "|")
    }
}

架构的扩展性与局限性

此架构的优势在于其优雅的简洁性和水平扩展能力。当ScyllaDB集群压力增大时,可以增加节点。当分析查询变慢时,可以增加Trino worker节点。两者解耦,可以独立扩展。

然而,它的局限性也同样明显。首先,这套方案并未真正实现OLTP和OLAP的工作负载隔离。一个设计不佳的、需要扫描海量数据的Trino查询,仍然可能消耗ScyllaDB大量的CPU和IO资源,潜在地影响线上服务的延迟。在生产环境中,必须通过Trino的资源组(Resource Groups)功能对查询进行严格的资源限制和排队管理,防止“坏查询”拖垮整个系统。

其次,对于需要分析数月甚至数年历史数据的场景,直接扫描ScyllaDB的性能会急剧下降。当前架构最适合的是对“热”或“温”数据的近实时分析。一个可行的演进方向是混合架构:为Trino配置第二个数据源,如一个指向S3上历史数据(Iceberg/Hudi格式)的Hive Connector。这样,Trino可以在一个查询中无缝地联合ScyllaDB中的实时数据和S3中的历史数据,实现真正的HTAP(Hybrid transactional/analytical processing)能力。


  目录