一个看似简单的同步 LLM 调用是许多生产事故的开端。
// 这是最直接的方式,也是最脆弱的方式。
[ApiController]
[Route("api/generate")]
public class GenerationController : ControllerBase
{
private readonly LlmApiClient _llmClient;
public GenerationController(LlmApiClient llmClient)
{
_llmClient = llmClient;
}
[HttpPost]
public async Task<IActionResult> Generate([FromBody] string prompt)
{
// 阻塞前端请求,直到LLM返回结果
// LLM服务慢?整个API跟着慢。
// LLM服务挂了?API直接返回500。
// 瞬时流量高峰?直接打垮LLM服务。
var result = await _llmClient.GenerateTextAsync(prompt);
return Ok(result);
}
}
在真实项目中,这种代码无法存活。它将HTTP请求的生命周期与一个缓慢且不可预测的下游依赖(LLM API)紧密耦合。我们需要解耦,需要缓冲,需要一种机制来处理失败、重试和横向扩展。这自然引出了基于队列的异步处理架构。
我们的技术栈是.NET,因此第一反应可能是RabbitMQ或Kafka。但对于许多场景,这两种选择都显得过于重型。我们需要的是一个轻量、持久化、支持消费者组且性能优异的中间件。Redis Streams 恰好填补了这个生态位。它不像Redis Pub/Sub那样发完即忘,也不像Redis List那样需要复杂的逻辑来模拟消费者组,它提供了恰到好处的持久化和消费模型。
本文的目标,就是从零开始,用C#构建一个围绕Redis Streams的、生产级的LLM推理任务管道,并利用Datadog实现从请求入口到任务完成的全链路深度可观测性。我们将不仅关注“如何实现”,更关注“为何如此设计”以及如何处理现实世界中的故障。
架构蓝图:解耦与缓冲
在动手写代码前,我们先用流程图定义清晰的系统边界和数据流。
graph TD subgraph "Producer Service (ASP.NET Core API)" A[HTTP POST /api/tasks] --> B{Request Validation}; B --> C[Generate Task ID]; C --> D[Construct Stream Message]; D --> |XADD llm:tasks| E(Redis Stream); end subgraph "Consumer Service (Background Worker)" E --> |XREADGROUP GroupA Consumer1| F[Consumer Instance 1]; E --> |XREADGROUP GroupA Consumer2| G[Consumer Instance 2]; E --> |...| H[...]; F --> I{LLM API Call}; G --> I; I --> J{Process Result}; J -- Success --> K[XACK]; J -- Failure --> L{Retry Logic}; L -- Max Retries Reached --> M[Move to DLQ Stream]; M --> |XADD llm:tasks:dlq| N(Dead Letter Stream); L -- Can Retry --> K[XACK]; end subgraph "Observability Platform (Datadog)" A -- Trace --> O(APM); F -- Trace --> O; G -- Trace --> O; A -- Logs --> P(Logs); F -- Logs --> P; G -- Logs --> P; F -- Metrics --> Q(Metrics); G -- Metrics --> Q; end style E fill:#f9f,stroke:#333,stroke-width:2px style N fill:#ff6347,stroke:#333,stroke-width:2px
这个架构的核心思想是:
- 生产者(API服务) 的职责非常单一:验证请求,将其转化为一个持久化的任务消息,然后快速响应客户端。它不关心任务何时、由谁执行。
- Redis Stream (
llm:tasks
) 作为任务的缓冲和持久化层。 - 消费者(后台服务) 组成一个消费者组 (
GroupA
),它们竞争性地从流中拉取任务。这天然地实现了负载均衡和高可用:一个消费者宕机,其他消费者会自动接管其未完成的任务。 - 死信队列 (
llm:tasks:dlq
) 用于隔离处理失败的“毒丸”消息,防止它们阻塞整个处理流程。 - Datadog 贯穿始终,捕获每个环节的追踪、指标和日志数据,并将它们关联起来。
Phase 1: 任务生产者实现
生产者是一个ASP.NET Core Web API。它的关键是与StackExchange.Redis
的集成。
依赖配置与连接管理
一个常见的错误是为每个请求创建新的ConnectionMultiplexer
实例。这是极其昂贵的操作。ConnectionMultiplexer
被设计为单例,在整个应用程序生命周期内共享。
// Program.cs
using StackExchange.Redis;
// ... other services
var redisConnectionString = builder.Configuration.GetConnectionString("Redis");
if (string.IsNullOrEmpty(redisConnectionString))
{
throw new InvalidOperationException("Redis connection string is not configured.");
}
// 注册为单例
builder.Services.AddSingleton<IConnectionMultiplexer>(
ConnectionMultiplexer.Connect(redisConnectionString)
);
builder.Services.AddSingleton<ITaskProducer, RedisTaskProducer>();
// ... Datadog APM and Logging configuration would go here
生产者核心代码
生产者接口很简单,但实现细节很重要,尤其是错误处理和日志记录。
// ITaskProducer.cs
public interface ITaskProducer
{
Task<string> EnqueueTaskAsync(string prompt, Dictionary<string, string>? metadata = null);
}
// RedisTaskProducer.cs
using Serilog;
using StackExchange.Redis;
using System.Diagnostics;
public class RedisTaskProducer : ITaskProducer
{
private readonly IConnectionMultiplexer _redis;
private readonly IDatabase _database;
private readonly ILogger _logger = Log.ForContext<RedisTaskProducer>();
// Redis stream and consumer group names
private const string StreamName = "llm:tasks";
public RedisTaskProducer(IConnectionMultiplexer redis)
{
_redis = redis;
_database = _redis.GetDatabase();
}
public async Task<string> EnqueueTaskAsync(string prompt, Dictionary<string, string>? metadata = null)
{
var taskId = Guid.NewGuid().ToString("N");
// Datadog APM: 获取当前活动的 span
var activeSpan = Datadog.Trace.Tracer.Instance.ActiveScope?.Span;
var message = new NameValueEntry[]
{
new("task_id", taskId),
new("prompt", prompt),
new("enqueue_time_utc", DateTime.UtcNow.ToString("o")),
// 关键:注入追踪上下文,用于跨进程传播
new("dd_trace_id", activeSpan?.TraceId.ToString() ?? "0"),
new("dd_span_id", activeSinactiveSpan?.SpanId.ToString() ?? "0")
};
try
{
// XADD 命令将新条目添加到流的末尾
var messageId = await _database.StreamAddAsync(StreamName, message);
_logger.Information(
"Task {TaskId} enqueued to stream {StreamName} with message ID {MessageId}",
taskId, StreamName, messageId);
// 使用 Datadog 的 DogStatsD 客户端发送自定义指标
Datadog.Trace.DogStatsd.Increment("llm.tasks.enqueued", tags: new[] { "stream:llm-tasks" });
return taskId;
}
catch (RedisConnectionException ex)
{
_logger.Error(ex, "Failed to enqueue task {TaskId} due to Redis connection error.", taskId);
Datadog.Trace.DogStatsd.Increment("llm.tasks.enqueue.errors", tags: new[] { "reason:connection" });
throw; // 向上抛出,让上层处理(例如返回 503 Service Unavailable)
}
catch (Exception ex)
{
_logger.Error(ex, "An unexpected error occurred while enqueuing task {TaskId}.", taskId);
Datadog.Trace.DogStatsd.Increment("llm.tasks.enqueue.errors", tags: new[] { "reason:unknown" });
throw;
}
}
}
这段代码有几个关键点:
- 注入追踪上下文: 这是实现分布式追踪的核心。我们将Datadog的
trace_id
和span_id
作为消息的一部分放入Stream。消费者稍后会提取这些ID来续接追踪链。 - 结构化日志: 使用Serilog,并记录关键的上下文信息如
TaskId
和StreamName
。当配置了Datadog日志集成后,这些日志会自动与对应的Trace关联。 - 自定义指标:
DogStatsd.Increment
用于发送一个计数器指标。我们可以在Datadog仪表盘上实时看到任务入队的速率和失败情况。
Phase 2: 可靠的消费者实现
消费者是一个后台服务,它使用IHostedService
实现,随应用程序启动而运行。
消费者组的设置
消费者组只需要创建一次。我们可以在消费者服务启动时进行检查和创建。
// 在消费者服务的启动逻辑中
var db = _redis.GetDatabase();
const string streamName = "llm:tasks";
const string groupName = "llm-processor-group";
try
{
if (!(await db.KeyExistsAsync(streamName)) || (await db.StreamGroupInfoAsync(streamName)).All(g => g.Name != groupName))
{
// 从流的开始创建组,"$": 从末尾开始,"0-0": 从头开始
await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0", createStream: true);
_logger.Information("Created Redis Stream consumer group {GroupName} for stream {StreamName}", groupName, streamName);
}
}
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP"))
{
// 忽略组已存在的错误,这是并发场景下的正常现象
_logger.Warning("Consumer group {GroupName} already exists.", groupName);
}
消费循环与故障处理
消费者的核心是一个无限循环,它使用XREADGROUP
阻塞式地等待新消息。
// LlmTaskConsumerService.cs (IHostedService implementation)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.Information("LLM Task Consumer Service is starting.");
// ... 消费者组创建逻辑 ...
var consumerName = $"consumer-{Environment.MachineName}-{Process.GetCurrentProcess().Id}";
while (!stoppingToken.IsCancellationRequested)
{
try
{
// ">" 表示只读取从未传递给该消费者的任何其他消费者的消息
var entries = await _database.StreamReadGroupAsync(StreamName, GroupName, consumerName, ">", count: 1, milliseconds: 5000);
if (entries.Length == 0)
{
continue; // 没有新消息,继续等待
}
foreach (var entry in entries)
{
await ProcessEntry(entry, consumerName, stoppingToken);
}
}
catch (RedisConnectionException ex)
{
_logger.Error(ex, "Redis connection lost. Retrying in 5 seconds...");
await Task.Delay(5000, stoppingToken);
}
catch (Exception ex)
{
_logger.Error(ex, "Unhandled exception in consumer loop. Restarting loop.");
await Task.Delay(1000, stoppingToken);
}
}
}
ProcessEntry
方法是处理单个任务的核心。这里的健壮性设计至关重要。
private async Task ProcessEntry(StreamEntry entry, string consumerName, CancellationToken stoppingToken)
{
var messageId = entry.Id;
var task = entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
task.TryGetValue("task_id", out var taskId);
// 续接分布式追踪
Datadog.Trace.TraceContext? propagatedContext = null;
if (task.TryGetValue("dd_trace_id", out var traceIdStr) && ulong.TryParse(traceIdStr, out var traceId) &&
task.TryGetValue("dd_span_id", out var parentIdStr) && ulong.TryParse(parentIdStr, out var parentId))
{
propagatedContext = new Datadog.Trace.TraceContext(traceId, parentId);
}
using (var scope = Datadog.Trace.Tracer.Instance.StartActive("llm.process_task", new Datadog.Trace.SpanCreationSettings { Parent = propagatedContext }))
{
var span = scope.Span;
span.ResourceName = $"{GroupName}:{consumerName}";
span.SetTag("task_id", taskId);
span.SetTag("redis.stream", StreamName);
span.SetTag("redis.message_id", messageId);
var log = _logger.ForContext("TaskId", taskId);
try
{
log.Information("Processing task.");
var prompt = task["prompt"];
// 模拟LLM调用
var result = await _llmApiClient.GenerateAsync(prompt, stoppingToken);
log.Information("LLM call successful. Result length: {Length}", result.Length);
// 业务处理完成,确认消息
await _database.StreamAcknowledgeAsync(StreamName, GroupName, messageId);
Datadog.Trace.DogStatsd.Increment("llm.tasks.processed", tags: new[] { "status:success" });
Datadog.Trace.DogStatsd.Distribution("llm.tasks.processing_time", (DateTime.UtcNow - span.StartTime.UtcDateTime).TotalMilliseconds);
}
catch (Exception ex)
{
span.SetException(ex);
log.Error(ex, "Failed to process task.");
Datadog.Trace.DogStatsd.Increment("llm.tasks.processed", tags: new[] { "status:failure" });
// 在真实项目中,这里应该有更复杂的重试和DLQ逻辑。
// 为简化,我们直接确认,避免无限重试。
// 正确的做法是检查重试次数,如果超限则移入DLQ。
await HandleProcessingFailure(entry);
}
}
}
Phase 3: 实现死信队列(DLQ)与僵尸消息处理
仅仅捕获异常是不够的。如果一条消息本身有问题(例如,格式错误导致解析失败),它会被反复消费、反复失败,这就是“毒丸”消息。消费者组有一个特性:如果消息被消费但没有被XACK
,在超时后它会重新分配给组内的其他消费者。这本是为容错设计的,但遇上毒丸消息就成了灾难。
我们需要一个DLQ机制。当一条消息处理失败达到预设次数后,就不再重试,而是将其转移到一个专门的llm:tasks:dlq
流中,等待人工干预或特殊逻辑处理。
同时,我们还需要处理“僵尸消息”:一个消费者拉取了消息,但在处理完成(XACK
)前就崩溃了。这条消息会一直处于“待处理”(Pending)状态。我们需要一个独立的清理进程,定期检查这些僵尸消息,并将它们重新分配给活跃的消费者。
// 僵尸消息清理逻辑 (可以放在另一个IHostedService中)
public async Task ReclaimStalePendingMessagesAsync(CancellationToken stoppingToken)
{
var consumerName = $"reclaimer-{Environment.MachineName}";
var staleThreshold = TimeSpan.FromMinutes(5); // 超过5分钟未ACK的消息视为僵尸消息
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
try
{
var pendingInfo = await _database.StreamPendingAsync(StreamName, GroupName);
if (pendingInfo.PendingMessageCount == 0) continue;
// 获取详细的待处理消息列表
var pendingMessages = await _database.StreamPendingMessagesAsync(
StreamName,
GroupName,
100, // 每次处理100条
RedisValue.Null, // 总是从头开始
RedisValue.Null
);
foreach (var msg in pendingMessages)
{
if (msg.IdleTime > staleThreshold)
{
_logger.Warning("Reclaiming stale message {MessageId} from consumer {Consumer}, idle for {IdleTime}",
msg.MessageId, msg.ConsumerName, msg.IdleTime);
// XCLAIM 将消息所有权转移给新的消费者
await _database.StreamClaimAsync(StreamName, GroupName, consumerName, (long)staleThreshold.TotalMilliseconds, new[] { msg.MessageId });
Datadog.Trace.DogStatsd.Increment("llm.tasks.reclaimed");
}
}
}
catch (Exception ex)
{
_logger.Error(ex, "Error during stale message reclaim process.");
}
}
}
处理失败并移入DLQ的逻辑:
// 在 ProcessEntry 的 catch 块中调用
private async Task HandleProcessingFailure(StreamEntry entry)
{
const int maxRetries = 3;
var messageId = entry.Id;
// 检查此消息的投递次数
var pendingMessages = await _database.StreamPendingMessagesAsync(StreamName, GroupName, 1, consumerName: RedisValue.Null, messageId: messageId, messageId: messageId);
var deliveryCount = pendingMessages.FirstOrDefault()?.DeliveryCount ?? 1;
if (deliveryCount > maxRetries)
{
_logger.Warning("Task {MessageId} failed {DeliveryCount} times, moving to DLQ.", messageId, deliveryCount);
// 1. 将消息内容写入DLQ流
await _database.StreamAddAsync(DeadLetterStreamName, entry.Values);
// 2. 从原流中确认消息,以防再次投递
await _database.StreamAcknowledgeAsync(StreamName, GroupName, messageId);
Datadog.Trace.DogStatsd.Increment("llm.tasks.dlq.moved");
}
else
{
// 允许消息在超时后被自动重新投递
_logger.Information("Task {MessageId} failed, will be retried. Attempt {AttemptCount}/{MaxRetries}", messageId, deliveryCount, maxRetries);
}
}
Datadog:将所有信号关联起来
到目前为止,我们已经构建了一个健壮的管道,并发送了追踪、日志和指标。Datadog的魔力在于将它们无缝地连接起来。
当我们在Datadog APM中查看一条trace时,它会是这样的:
aspnet_core.request
(来自生产者API)-
redis.command
(XADD)
-
llm.process_task
(来自消费者服务)-
llm.api.call
(一个手动的内部span,包裹LLM客户端调用) -
redis.command
(XACK)
-
由于我们在生产者中注入了追踪上下文,并在消费者中续接了它,这两个看似独立的服务中的span会被自动链接成一个完整的分布式追踪。
更进一步,因为我们配置了日志注入,当你在Datadog中查看llm.process_task
这个span时,侧边栏会显示出所有在这个span执行期间由消费者服务产生的日志。你不再需要根据时间戳和模糊的请求ID去手动关联日志和追踪。
最后,我们创建的自定义指标(llm.tasks.enqueued
, llm.tasks.processed
, llm.tasks.processing_time
)可以用来创建仪表盘和告警。例如,我们可以设置一个告警:如果“成功处理的任务数”与“入队的任务数”之间的差距在5分钟内持续扩大,说明消费者可能出了问题,需要立即关注。或者,如果llm.tasks.processing_time
的p99分位数突然升高,说明LLM服务可能存在性能瓶颈。
局限性与未来迭代路径
这个架构解决了最初的同步调用问题,并提供了相当不错的弹性和可观测性,但它并非完美。
首先,Redis本身虽然可以通过Sentinel或Cluster实现高可用,但它仍然是整个系统的核心瓶颈和单点故障源。对于需要跨地域容灾或对数据持久性有金融级要求的系统,可能需要考虑如Kafka这样的更重型的消息系统。
其次,消费者的伸缩目前是手动的。在生产环境中,我们希望消费者数量能根据队列的积压情况自动增减。这可以通过集成KEDA (Kubernetes Event-driven Autoscaling) 实现。KEDA有一个Redis Streams scaler,可以监控流的长度或消费者组的延迟,并据此自动调整消费者Deployment的副本数。
最后,可观测性可以更深入。我们目前追踪的是技术指标,下一步可以加入业务指标。例如,在任务消息中加入user_id
或tenant_id
,然后在Datadog的指标和追踪中将它们作为标签。这样我们就能回答“哪个用户的请求处理最慢?”或“哪个租户消耗了最多的LLM资源?”这类问题,将技术监控提升到业务洞察的层面。