我们面临一个具体的工程挑战:一个高并发的AI推荐服务,每秒产生数十万次用户行为事件。这些事件必须以极低的延迟写入数据库,用于实时用户画像的更新与读取,这是典型的OLTP场景。同时,数据科学家和运营团队需要一个交互式仪表盘,对这些最近几小时甚至几天内的原始事件数据进行探索性分析(ad-hoc),以验证模型假设、诊断异常或发现新特征。
业务对这个分析仪表盘的核心要求是:数据新鲜度必须在分钟级别,复杂聚合查询的P95延迟必须在10秒以内。传统的解决方案无法满足这种对新鲜度和交互性的双重苛求。
定义问题边界与约束
首先,我们量化一下技术指标:
- 写入吞吐: 峰值可达 200,000 writes/sec。
- 单点读取延迟: P99 < 10ms (用于线上服务)。
- 数据总量: 每日增量约 5TB 原始事件。
- 分析查询模式: Ad-hoc,包含
GROUP BY
,JOIN
,COUNT(DISTINCT)
, 窗口函数等。 - 分析查询延迟: P95 < 10s。
- 数据新鲜度: 分析仪表盘看到的数据延迟不超过5分钟。
- 架构约束: 尽可能减少数据冗余和ETL管道的复杂性,控制运维成本。
ScyllaDB因其与Cassandra兼容的API和极致的I/O性能,已被选定为线上服务的核心OLTP数据库,它完全能满足写入和单点读取的需求。问题的核心在于如何优雅地嫁接上高性能的OLAP能力。
方案A:经典的离线数仓架构
这是最容易想到的方案。架构如下:
graph TD subgraph OLTP Service[推荐服务] --> ScyllaDB end subgraph ETL Pipeline ScyllaDB -- 定时导出 (Spark Batch) --> DataLake[数据湖 S3/HDFS] DataLake -- T+1 加载 --> DWH[数据仓库 ClickHouse/Snowflake] end subgraph OLAP Dashboard[分析仪表盘] --> DWH end
- 实现路径: 每天凌晨通过Spark作业,将ScyllaDB的数据快照导出为Parquet格式存入S3。然后,另一个作业将这些Parquet文件加载到ClickHouse集群中。SwiftUI构建的仪表盘直接查询ClickHouse。
- 优势:
- 物理隔离: OLTP和OLAP负载完全分离,分析查询的性能风暴不会影响线上服务。
- 成熟稳定: 这是业界最成熟的方案,生态工具链非常完善。
- 查询性能: ClickHouse这类列式存储数据库对聚合查询的性能优化是极致的。
- 劣势:
- 数据延迟: 整个流程是
T+1
的,数据新鲜度是天级别,完全不满足“分钟级”的核心需求。 - 存储成本: 数据在ScyllaDB、S3、ClickHouse中存储了三份,成本高昂。
- 运维复杂度: 维护一个Spark集群和ClickHouse集群,以及调度ETL作业,带来了显著的运维负担。
- 数据延迟: 整个流程是
在我们的场景下,数据延迟是不可逾越的障碍。此方案被直接否决。
方案B:流式计算实时聚合架构
为了解决数据延迟问题,自然会想到流式处理。
graph TD subgraph Ingestion Service[推荐服务] --> Kafka end subgraph Data Flow Kafka -- Stream 1 --> ScyllaDB[(ScyllaDB)] Kafka -- Stream 2 --> Flink[Flink实时聚合] end subgraph Storage & Serving Flink --> OLAP_DB[实时OLAP库 Druid/ClickHouse] Dashboard[分析仪表盘] --> OLAP_DB end
- 实现路径: 服务将事件写入Kafka。一个消费者将原始数据实时写入ScyllaDB,保障OLTP需求。另一个Flink作业消费同样的数据流,进行预定义的聚合(例如计算每分钟的点击量、独立用户数),并将结果写入专门的OLAP数据库,如Druid或ClickHouse。
- 优势:
- 极低延迟: 数据新鲜度可以达到秒级。
- 高查询性能: 由于数据是预聚合的,仪表盘上的固定指标查询会非常快。
- 劣势:
- 灵活性差: 这是该方案的致命伤。它只能回答“预先定义好的问题”。如果数据科学家想探索一个新的、未被预聚合的维度组合,就必须修改Flink代码,重新部署作业,然后等待新数据生成。这完全违背了“探索性分析”的初衷。
- 运维复杂度更高: 在方案A的基础上,又引入了Kafka和Flink两个复杂的分布式系统。
- 原始数据查询问题: 如果要查询原始明细数据,依然需要回源到ScyllaDB,或者将明细数据也灌入Druid,但这会带来巨大的存储和索引成本。
这个方案虽然解决了延迟问题,却牺牲了查询的灵活性。对于一个探索性分析平台,这是不可接受的。此方案也被否决。
最终选择:基于Trino的联邦查询架构
我们需要的是一种既能利用ScyllaDB的实时写入能力,又能提供足够灵活的即席查询,同时避免复杂数据同步链路的方案。Trino(原PrestoSQL)进入了我们的视野。
Trino是一个分布式SQL查询引擎,它的核心思想是“连接一切数据源”。它可以将ScyllaDB/Cassandra伪装成一个关系型数据库表,然后用SQL进行查询。
graph TD subgraph Data Tier Service[推荐服务] -- 200k writes/sec --> ScyllaDB[(ScyllaDB Cluster)] end subgraph Query Tier TrinoCoordinator[Trino Coordinator] TrinoWorker1[Trino Worker] TrinoWorker2[Trino Worker] TrinoWorker3[Trino Worker] TrinoCoordinator --> TrinoWorker1 TrinoCoordinator --> TrinoWorker2 TrinoCoordinator --> TrinoWorker3 TrinoWorker1 -- Cassandra Connector --> ScyllaDB TrinoWorker2 -- Cassandra Connector --> ScyllaDB TrinoWorker3 -- Cassandra Connector --> ScyllaDB end subgraph Application Tier SwiftUI_Dashboard[SwiftUI Dashboard on macOS] --> API_Gateway[Go API Gateway] API_Gateway -- SQL Query --> TrinoCoordinator end
- 实现路径: ScyllaDB作为唯一的数据真理源。部署一个独立的Trino集群,通过其Cassandra Connector直接连接ScyllaDB。SwiftUI仪表盘通过一个轻量级的API网关向Trino提交查询请求。
- 选择理由:
- 零数据冗余: 无需ETL,Trino直接查询ScyllaDB中的实时数据,数据新鲜度取决于ScyllaDB的写入延迟,即秒级。
- 极高的灵活性: 提供了完整的SQL能力,数据科学家可以编写任意复杂的查询,在原始数据上进行探索。
- 架构简化: 相比方案A和B,技术栈大大简化,只需要维护ScyllaDB和Trino两个核心组件。
- 水平扩展: Trino和ScyllaDB都是可水平扩展的架构,可以独立增减节点以应对负载变化。
当然,这个方案并非银弹。这里的坑在于,将OLAP负载直接施加在OLTP数据库上,必须进行精细的设计和调优,否则分析查询可能会拖垮线上服务。这正是我们工作的核心。
核心实现与调优细节
1. ScyllaDB Schema 设计
ScyllaDB的表结构设计是整个架构成功的基石。它必须同时兼顾高并发写入、快速单点查询以及可接受的全表扫描性能。
-- 用户行为事件表
CREATE TABLE product.user_events (
-- 分区键: 使用 event_date 和 a_bucket (artificial bucket) 组合
-- event_date: 保证数据按天分区,便于管理和历史数据归档
-- a_bucket: 人工分桶,将一天内的数据分散到 N 个分区中,避免单分区过大。
-- 例如,可以基于 user_id % 100 来计算,将负载均匀分散。
event_date date,
a_bucket int,
-- 集群键: 决定了分区内的数据排序,对范围查询至关重要
event_timestamp timestamp,
user_id uuid,
event_id timeuuid,
-- 其他事件属性
event_type text,
product_id text,
session_id uuid,
properties map<text, text>,
PRIMARY KEY ((event_date, a_bucket), event_timestamp, user_id, event_id)
) WITH CLUSTERING ORDER BY (event_timestamp DESC);
设计考量:
- 分区键
(event_date, a_bucket)
: 这是Trino性能的关键。Trino在查询ScyllaDB时,如果WHERE子句中包含了完整的分区键,它能够实现高效的“分区剪枝”,只读取相关的分区。我们的仪表盘查询通常会带上时间范围,event_date
可以很好地利用这一点。a_bucket
则纯粹是为了打散热点,防止单个分区在写入或读取时成为瓶颈。在真实项目中,a_bucket
的数量需要根据每日数据量仔细评估。 - 集群键
event_timestamp DESC
: 这保证了每个分区内的数据按时间倒序排列。这对于获取“最近N条事件”这类查询非常高效。 - 避免过度使用二级索引: ScyllaDB的二级索引在高性能场景下是反模式。我们选择通过设计更宽的分区键或物化视图来满足不同查询需求,而不是依赖二级索引。
2. Trino 连接器配置
Trino与ScyllaDB的连接配置直接影响查询效率和稳定性。以下是etc/catalog/scylla.properties
文件的关键配置。
# etc/catalog/scylla.properties
connector.name=cassandra
cassandra.contact-points=scylla-node1.example.com,scylla-node2.example.com
cassandra.port=9042
cassandra.native-protocol-port=9042
cassandra.keyspace=product
# 核心调优参数
# 1. 一致性级别: 分析查询对一致性要求不高,使用 LOCAL_QUORUM 避免跨机房延迟,并减轻集群压力。
# 严禁使用 ANY 或 ALL。
cassandra.consistency-level=LOCAL_QUORUM
# 2. 分片大小: Trino 将 CQL 表的 token range 划分为多个 split 交给 worker 处理。
# 这个值需要根据数据分布和ScyllaDB节点性能进行调优。默认值可能过大。
# 我们从 128MB 开始测试。
cassandra.split-size=128MB
# 3. 分片策略: 对于 Vnode 环,size_based 更均匀。
cassandra.partition-splitter=size_based
# 4. 每次从ScyllaDB获取的数据行数。需要与JVM内存相匹配,避免OOM。
cassandra.fetch-size=2000
# 5. 负载均衡策略: TokenAwarePolicy是必须的,它能将请求路由到持有该数据的主节点。
# DCAwareRoundRobinPolicy 保证请求只在本地数据中心循环,对多数据中心部署至关重要。
cassandra.load-policy=DCAwareRoundRobinPolicy:local_dc=dc1,used_hosts_per_remote_dc=0
# 6. 超时设置: 分析查询可能耗时较长,需要适当调高超时时间,防止Trino过早放弃。
cassandra.client.read-timeout=5m
cassandra.client.connect-timeout=30s
3. 中间层 Go API 网关
直接将Trino暴露给客户端是不可接受的,原因有三:安全、稳定性和可管理性。因此,我们构建了一个轻量级的Go API网关。
// main.go
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"time"
)
var (
trinoUser = "data-scientist"
trinoCoordinator = "http://trino-coordinator.example.com:8080"
trinoCatalog = "scylla"
trinoSchema = "product"
)
// QueryRequest 定义了客户端传入的请求结构
type QueryRequest struct {
Query string `json:"query"`
}
func queryHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
// 1. 解析和校验请求
var reqBody QueryRequest
if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// 2. [安全] 在生产环境中,此处必须有SQL注入防护、查询复杂度分析和权限校验
// 例如,可以解析AST,禁止某些危险操作,或者限制扫描的数据量。
// 这里为了演示,仅做简单记录。
log.Printf("Received query: %s", reqBody.Query)
// 3. 构建对Trino的请求
trinoReq, err := http.NewRequest("POST", trinoCoordinator+"/v1/statement", bytes.NewBufferString(reqBody.Query))
if err != nil {
http.Error(w, "Failed to create Trino request", http.StatusInternalServerError)
return
}
trinoReq.Header.Set("Content-Type", "text/plain")
trinoReq.Header.Set("X-Trino-User", trinoUser)
trinoReq.Header.Set("X-Trino-Catalog", trinoCatalog)
trinoReq.Header.Set("X-Trino-Schema", trinoSchema)
trinoReq.Header.Set("X-Trino-Source", "SwiftUI-Dashboard")
// 4. 执行查询并处理分页结果
client := &http.Client{Timeout: 5 * time.Minute}
resp, err := client.Do(trinoReq)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to execute query on Trino: %v", err), http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
// Trino的API是异步的,它会返回一个nextUri用于获取结果
w.Header().Set("Content-Type", "application/json")
if err := processTrinoResponse(client, resp, w); err != nil {
// 错误处理非常重要,需要将Trino的错误信息清晰地返回给客户端
http.Error(w, fmt.Sprintf("Error processing Trino response: %v", err), http.StatusInternalServerError)
}
}
// processTrinoResponse 负责处理Trino的分页响应
func processTrinoResponse(client *http.Client, initialResp *http.Response, w http.ResponseWriter) error {
var bodyJSON map[string]interface{}
if err := json.NewDecoder(initialResp.Body).Decode(&bodyJSON); err != nil {
return fmt.Errorf("failed to decode initial response: %w", err)
}
encoder := json.NewEncoder(w)
// 开启流式JSON编码
w.Write([]byte(`{"results": [`))
firstChunk := true
nextURI, ok := bodyJSON["nextUri"].(string)
for ok && nextURI != "" {
nextReq, _ := http.NewRequest("GET", nextURI, nil)
nextResp, err := client.Do(nextReq)
if err != nil {
return fmt.Errorf("failed to fetch next chunk: %w", err)
}
var chunkJSON map[string]interface{}
bodyBytes, _ := io.ReadAll(nextResp.Body)
nextResp.Body.Close()
if err := json.Unmarshal(bodyBytes, &chunkJSON); err != nil {
return fmt.Errorf("failed to decode chunk JSON: %w", err)
}
// 将数据部分流式写入响应
if data, hasData := chunkJSON["data"].([]interface{}); hasData {
for _, row := range data {
if !firstChunk {
w.Write([]byte(`,`))
}
if err := encoder.Encode(row); err != nil {
return fmt.Errorf("failed to encode row: %w", err)
}
firstChunk = false
}
}
nextURI, ok = chunkJSON["nextUri"].(string)
}
w.Write([]byte(`]}`))
return nil
}
func main() {
http.HandleFunc("/query", queryHandler)
log.Println("API Gateway listening on :8888")
if err := http.ListenAndServe(":8888", nil); err != nil {
log.Fatal(err)
}
}
4. SwiftUI 客户端实现
SwiftUI客户端需要处理好异步数据加载、状态管理和动态数据显示。
import SwiftUI
// 用于表示从API返回的一行数据,这里简化为字典
typealias QueryRow = [String: AnyDecodable]
// AnyDecodable 帮助我们解码异构的JSON
struct AnyDecodable: Decodable {
let value: Any
init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
if let intVal = try? container.decode(Int.self) { value = intVal }
else if let doubleVal = try? container.decode(Double.self) { value = doubleVal }
else if let stringVal = try? container.decode(String.self) { value = stringVal }
else if let boolVal = try? container.decode(Bool.self) { value = boolVal }
else { value = NSNull() }
}
}
// API响应的顶层结构
struct QueryResponse: Decodable {
let results: [QueryRow]
}
// ViewModel负责管理状态和网络请求
@MainActor
class DashboardViewModel: ObservableObject {
enum ViewState {
case idle
case loading
case loaded([QueryRow])
case error(String)
}
@Published var state: ViewState = .idle
@Published var query: String = """
SELECT event_type, COUNT(DISTINCT user_id) AS distinct_users
FROM user_events
WHERE event_date = CURRENT_DATE - INTERVAL '1' DAY
GROUP BY 1
ORDER BY 2 DESC
LIMIT 100;
"""
private var apiGatewayURL = URL(string: "http://localhost:8888/query")!
func executeQuery() {
guard !query.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty else {
state = .error("Query cannot be empty.")
return
}
state = .loading
Task {
do {
// 1. 准备请求
var request = URLRequest(url: apiGatewayURL)
request.httpMethod = "POST"
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
let requestBody = ["query": query]
request.httpBody = try JSONEncoder().encode(requestBody)
// 2. 发起网络请求
// 在生产级应用中,这里的URLSession应该被更完善地配置,例如设置超时
let (data, response) = try await URLSession.shared.data(for: request)
guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 200 else {
let errorBody = String(data: data, encoding: .utf8) ?? "Unknown server error"
throw URLError(.badServerResponse, userInfo: [NSLocalizedDescriptionKey: errorBody])
}
// 3. 解码JSON
let decodedResponse = try JSONDecoder().decode(QueryResponse.self, from: data)
state = .loaded(decodedResponse.results)
} catch {
// 4. 细致的错误处理
logError(error)
state = .error("Failed to execute query: \(error.localizedDescription)")
}
}
}
private func logError(_ error: Error) {
// 在真实项目中,这里会连接到日志系统
print("An error occurred: \(error)")
}
}
// SwiftUI视图
struct DashboardView: View {
@StateObject private var viewModel = DashboardViewModel()
var body: some View {
VStack(alignment: .leading, spacing: 10) {
Text("Trino Query Editor")
.font(.title)
TextEditor(text: $viewModel.query)
.font(.system(.body, design: .monospaced))
.frame(height: 200)
.border(Color.gray, width: 0.5)
HStack {
Button("Execute Query", action: viewModel.executeQuery)
.disabled(viewModel.state.isLoading)
if viewModel.state.isLoading {
ProgressView()
}
}
// 结果展示区域
switch viewModel.state {
case .idle:
Text("Enter a query and click 'Execute'.")
.foregroundColor(.secondary)
case .loading:
Text("Querying Trino...")
.foregroundColor(.secondary)
case .loaded(let rows):
// 对于macOS和iPadOS,Table是展示表格数据的理想选择
if #available(macOS 12.0, iOS 16.0, *) {
ResultTableView(rows: rows)
} else {
// Fallback for older systems
Text("Results loaded, but Table view is unavailable on this OS version.")
}
case .error(let errorMessage):
Text(errorMessage)
.foregroundColor(.red)
}
Spacer()
}
.padding()
}
}
// 扩展来检查加载状态
extension DashboardViewModel.ViewState {
var isLoading: Bool {
if case .loading = self { return true }
return false
}
}
// 使用SwiftUI的Table来展示结果
@available(macOS 12.0, iOS 16.0, *)
struct ResultTableView: View {
let rows: [QueryRow]
let columns: [String]
init(rows: [QueryRow]) {
self.rows = rows
self.columns = rows.first?.keys.sorted() ?? []
}
var body: some View {
// Table需要唯一的行标识符
Table(rows, selection: .constant(Set<QueryRow.ID>())) {
ForEach(columns, id: \.self) { columnKey in
// 构建动态列
TableColumn(columnKey) { row in
Text(displayValue(for: row[columnKey]?.value))
}
}
}
}
private func displayValue(for value: Any?) -> String {
guard let value = value, !(value is NSNull) else { return "NULL" }
return String(describing: value)
}
}
// 让字典可以被Identifiable
extension Dictionary: Identifiable where Key == String, Value == AnyDecodable {
public var id: String {
// 为表格行生成一个稳定的ID,这里用所有值的字符串拼接,不是最高效但可行
return self.map { "\($0.key):\($0.value.value)" }.joined(separator: "|")
}
}
架构的扩展性与局限性
此架构的优势在于其优雅的简洁性和水平扩展能力。当ScyllaDB集群压力增大时,可以增加节点。当分析查询变慢时,可以增加Trino worker节点。两者解耦,可以独立扩展。
然而,它的局限性也同样明显。首先,这套方案并未真正实现OLTP和OLAP的工作负载隔离。一个设计不佳的、需要扫描海量数据的Trino查询,仍然可能消耗ScyllaDB大量的CPU和IO资源,潜在地影响线上服务的延迟。在生产环境中,必须通过Trino的资源组(Resource Groups)功能对查询进行严格的资源限制和排队管理,防止“坏查询”拖垮整个系统。
其次,对于需要分析数月甚至数年历史数据的场景,直接扫描ScyllaDB的性能会急剧下降。当前架构最适合的是对“热”或“温”数据的近实时分析。一个可行的演进方向是混合架构:为Trino配置第二个数据源,如一个指向S3上历史数据(Iceberg/Hudi格式)的Hive Connector。这样,Trino可以在一个查询中无缝地联合ScyllaDB中的实时数据和S3中的历史数据,实现真正的HTAP(Hybrid transactional/analytical processing)能力。