使用 C#、Redis Streams 和 Datadog 构建可观测的 LLM 推理任务管道


一个看似简单的同步 LLM 调用是许多生产事故的开端。

// 这是最直接的方式,也是最脆弱的方式。
[ApiController]
[Route("api/generate")]
public class GenerationController : ControllerBase
{
    private readonly LlmApiClient _llmClient;

    public GenerationController(LlmApiClient llmClient)
    {
        _llmClient = llmClient;
    }

    [HttpPost]
    public async Task<IActionResult> Generate([FromBody] string prompt)
    {
        // 阻塞前端请求,直到LLM返回结果
        // LLM服务慢?整个API跟着慢。
        // LLM服务挂了?API直接返回500。
        // 瞬时流量高峰?直接打垮LLM服务。
        var result = await _llmClient.GenerateTextAsync(prompt);
        return Ok(result);
    }
}

在真实项目中,这种代码无法存活。它将HTTP请求的生命周期与一个缓慢且不可预测的下游依赖(LLM API)紧密耦合。我们需要解耦,需要缓冲,需要一种机制来处理失败、重试和横向扩展。这自然引出了基于队列的异步处理架构。

我们的技术栈是.NET,因此第一反应可能是RabbitMQ或Kafka。但对于许多场景,这两种选择都显得过于重型。我们需要的是一个轻量、持久化、支持消费者组且性能优异的中间件。Redis Streams 恰好填补了这个生态位。它不像Redis Pub/Sub那样发完即忘,也不像Redis List那样需要复杂的逻辑来模拟消费者组,它提供了恰到好处的持久化和消费模型。

本文的目标,就是从零开始,用C#构建一个围绕Redis Streams的、生产级的LLM推理任务管道,并利用Datadog实现从请求入口到任务完成的全链路深度可观测性。我们将不仅关注“如何实现”,更关注“为何如此设计”以及如何处理现实世界中的故障。

架构蓝图:解耦与缓冲

在动手写代码前,我们先用流程图定义清晰的系统边界和数据流。

graph TD
    subgraph "Producer Service (ASP.NET Core API)"
        A[HTTP POST /api/tasks] --> B{Request Validation};
        B --> C[Generate Task ID];
        C --> D[Construct Stream Message];
        D --> |XADD llm:tasks| E(Redis Stream);
    end

    subgraph "Consumer Service (Background Worker)"
        E --> |XREADGROUP GroupA Consumer1| F[Consumer Instance 1];
        E --> |XREADGROUP GroupA Consumer2| G[Consumer Instance 2];
        E --> |...| H[...];

        F --> I{LLM API Call};
        G --> I;

        I --> J{Process Result};
        J -- Success --> K[XACK];
        J -- Failure --> L{Retry Logic};
        L -- Max Retries Reached --> M[Move to DLQ Stream];
        M --> |XADD llm:tasks:dlq| N(Dead Letter Stream);
        L -- Can Retry --> K[XACK];
    end

    subgraph "Observability Platform (Datadog)"
        A -- Trace --> O(APM);
        F -- Trace --> O;
        G -- Trace --> O;
        A -- Logs --> P(Logs);
        F -- Logs --> P;
        G -- Logs --> P;
        F -- Metrics --> Q(Metrics);
        G -- Metrics --> Q;
    end

    style E fill:#f9f,stroke:#333,stroke-width:2px
    style N fill:#ff6347,stroke:#333,stroke-width:2px

这个架构的核心思想是:

  1. 生产者(API服务) 的职责非常单一:验证请求,将其转化为一个持久化的任务消息,然后快速响应客户端。它不关心任务何时、由谁执行。
  2. Redis Stream (llm:tasks) 作为任务的缓冲和持久化层。
  3. 消费者(后台服务) 组成一个消费者组 (GroupA),它们竞争性地从流中拉取任务。这天然地实现了负载均衡和高可用:一个消费者宕机,其他消费者会自动接管其未完成的任务。
  4. 死信队列 (llm:tasks:dlq) 用于隔离处理失败的“毒丸”消息,防止它们阻塞整个处理流程。
  5. Datadog 贯穿始终,捕获每个环节的追踪、指标和日志数据,并将它们关联起来。

Phase 1: 任务生产者实现

生产者是一个ASP.NET Core Web API。它的关键是与StackExchange.Redis的集成。

依赖配置与连接管理

一个常见的错误是为每个请求创建新的ConnectionMultiplexer实例。这是极其昂贵的操作。ConnectionMultiplexer被设计为单例,在整个应用程序生命周期内共享。

// Program.cs
using StackExchange.Redis;

// ... other services

var redisConnectionString = builder.Configuration.GetConnectionString("Redis");
if (string.IsNullOrEmpty(redisConnectionString))
{
    throw new InvalidOperationException("Redis connection string is not configured.");
}

// 注册为单例
builder.Services.AddSingleton<IConnectionMultiplexer>(
    ConnectionMultiplexer.Connect(redisConnectionString)
);
builder.Services.AddSingleton<ITaskProducer, RedisTaskProducer>();

// ... Datadog APM and Logging configuration would go here

生产者核心代码

生产者接口很简单,但实现细节很重要,尤其是错误处理和日志记录。

// ITaskProducer.cs
public interface ITaskProducer
{
    Task<string> EnqueueTaskAsync(string prompt, Dictionary<string, string>? metadata = null);
}

// RedisTaskProducer.cs
using Serilog;
using StackExchange.Redis;
using System.Diagnostics;

public class RedisTaskProducer : ITaskProducer
{
    private readonly IConnectionMultiplexer _redis;
    private readonly IDatabase _database;
    private readonly ILogger _logger = Log.ForContext<RedisTaskProducer>();

    // Redis stream and consumer group names
    private const string StreamName = "llm:tasks";

    public RedisTaskProducer(IConnectionMultiplexer redis)
    {
        _redis = redis;
        _database = _redis.GetDatabase();
    }

    public async Task<string> EnqueueTaskAsync(string prompt, Dictionary<string, string>? metadata = null)
    {
        var taskId = Guid.NewGuid().ToString("N");
        
        // Datadog APM: 获取当前活动的 span
        var activeSpan = Datadog.Trace.Tracer.Instance.ActiveScope?.Span;

        var message = new NameValueEntry[]
        {
            new("task_id", taskId),
            new("prompt", prompt),
            new("enqueue_time_utc", DateTime.UtcNow.ToString("o")),
            // 关键:注入追踪上下文,用于跨进程传播
            new("dd_trace_id", activeSpan?.TraceId.ToString() ?? "0"),
            new("dd_span_id", activeSinactiveSpan?.SpanId.ToString() ?? "0")
        };

        try
        {
            // XADD 命令将新条目添加到流的末尾
            var messageId = await _database.StreamAddAsync(StreamName, message);
            
            _logger.Information(
                "Task {TaskId} enqueued to stream {StreamName} with message ID {MessageId}", 
                taskId, StreamName, messageId);

            // 使用 Datadog 的 DogStatsD 客户端发送自定义指标
            Datadog.Trace.DogStatsd.Increment("llm.tasks.enqueued", tags: new[] { "stream:llm-tasks" });

            return taskId;
        }
        catch (RedisConnectionException ex)
        {
            _logger.Error(ex, "Failed to enqueue task {TaskId} due to Redis connection error.", taskId);
            Datadog.Trace.DogStatsd.Increment("llm.tasks.enqueue.errors", tags: new[] { "reason:connection" });
            throw; // 向上抛出,让上层处理(例如返回 503 Service Unavailable)
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "An unexpected error occurred while enqueuing task {TaskId}.", taskId);
            Datadog.Trace.DogStatsd.Increment("llm.tasks.enqueue.errors", tags: new[] { "reason:unknown" });
            throw;
        }
    }
}

这段代码有几个关键点:

  1. 注入追踪上下文: 这是实现分布式追踪的核心。我们将Datadog的trace_idspan_id作为消息的一部分放入Stream。消费者稍后会提取这些ID来续接追踪链。
  2. 结构化日志: 使用Serilog,并记录关键的上下文信息如TaskIdStreamName。当配置了Datadog日志集成后,这些日志会自动与对应的Trace关联。
  3. 自定义指标: DogStatsd.Increment用于发送一个计数器指标。我们可以在Datadog仪表盘上实时看到任务入队的速率和失败情况。

Phase 2: 可靠的消费者实现

消费者是一个后台服务,它使用IHostedService实现,随应用程序启动而运行。

消费者组的设置

消费者组只需要创建一次。我们可以在消费者服务启动时进行检查和创建。

// 在消费者服务的启动逻辑中
var db = _redis.GetDatabase();
const string streamName = "llm:tasks";
const string groupName = "llm-processor-group";

try
{
    if (!(await db.KeyExistsAsync(streamName)) || (await db.StreamGroupInfoAsync(streamName)).All(g => g.Name != groupName))
    {
        // 从流的开始创建组,"$": 从末尾开始,"0-0": 从头开始
        await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0", createStream: true);
        _logger.Information("Created Redis Stream consumer group {GroupName} for stream {StreamName}", groupName, streamName);
    }
}
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP"))
{
    // 忽略组已存在的错误,这是并发场景下的正常现象
    _logger.Warning("Consumer group {GroupName} already exists.", groupName);
}

消费循环与故障处理

消费者的核心是一个无限循环,它使用XREADGROUP阻塞式地等待新消息。

// LlmTaskConsumerService.cs (IHostedService implementation)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    _logger.Information("LLM Task Consumer Service is starting.");
    
    // ... 消费者组创建逻辑 ...

    var consumerName = $"consumer-{Environment.MachineName}-{Process.GetCurrentProcess().Id}";

    while (!stoppingToken.IsCancellationRequested)
    {
        try
        {
            // ">" 表示只读取从未传递给该消费者的任何其他消费者的消息
            var entries = await _database.StreamReadGroupAsync(StreamName, GroupName, consumerName, ">", count: 1, milliseconds: 5000);

            if (entries.Length == 0)
            {
                continue; // 没有新消息,继续等待
            }

            foreach (var entry in entries)
            {
                await ProcessEntry(entry, consumerName, stoppingToken);
            }
        }
        catch (RedisConnectionException ex)
        {
            _logger.Error(ex, "Redis connection lost. Retrying in 5 seconds...");
            await Task.Delay(5000, stoppingToken);
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "Unhandled exception in consumer loop. Restarting loop.");
            await Task.Delay(1000, stoppingToken);
        }
    }
}

ProcessEntry方法是处理单个任务的核心。这里的健壮性设计至关重要。

private async Task ProcessEntry(StreamEntry entry, string consumerName, CancellationToken stoppingToken)
{
    var messageId = entry.Id;
    var task = entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
    task.TryGetValue("task_id", out var taskId);

    // 续接分布式追踪
    Datadog.Trace.TraceContext? propagatedContext = null;
    if (task.TryGetValue("dd_trace_id", out var traceIdStr) && ulong.TryParse(traceIdStr, out var traceId) &&
        task.TryGetValue("dd_span_id", out var parentIdStr) && ulong.TryParse(parentIdStr, out var parentId))
    {
        propagatedContext = new Datadog.Trace.TraceContext(traceId, parentId);
    }

    using (var scope = Datadog.Trace.Tracer.Instance.StartActive("llm.process_task", new Datadog.Trace.SpanCreationSettings { Parent = propagatedContext }))
    {
        var span = scope.Span;
        span.ResourceName = $"{GroupName}:{consumerName}";
        span.SetTag("task_id", taskId);
        span.SetTag("redis.stream", StreamName);
        span.SetTag("redis.message_id", messageId);

        var log = _logger.ForContext("TaskId", taskId);

        try
        {
            log.Information("Processing task.");
            
            var prompt = task["prompt"];
            
            // 模拟LLM调用
            var result = await _llmApiClient.GenerateAsync(prompt, stoppingToken);
            
            log.Information("LLM call successful. Result length: {Length}", result.Length);

            // 业务处理完成,确认消息
            await _database.StreamAcknowledgeAsync(StreamName, GroupName, messageId);

            Datadog.Trace.DogStatsd.Increment("llm.tasks.processed", tags: new[] { "status:success" });
            Datadog.Trace.DogStatsd.Distribution("llm.tasks.processing_time", (DateTime.UtcNow - span.StartTime.UtcDateTime).TotalMilliseconds);
        }
        catch (Exception ex)
        {
            span.SetException(ex);
            log.Error(ex, "Failed to process task.");
            Datadog.Trace.DogStatsd.Increment("llm.tasks.processed", tags: new[] { "status:failure" });
            
            // 在真实项目中,这里应该有更复杂的重试和DLQ逻辑。
            // 为简化,我们直接确认,避免无限重试。
            // 正确的做法是检查重试次数,如果超限则移入DLQ。
            await HandleProcessingFailure(entry);
        }
    }
}

Phase 3: 实现死信队列(DLQ)与僵尸消息处理

仅仅捕获异常是不够的。如果一条消息本身有问题(例如,格式错误导致解析失败),它会被反复消费、反复失败,这就是“毒丸”消息。消费者组有一个特性:如果消息被消费但没有被XACK,在超时后它会重新分配给组内的其他消费者。这本是为容错设计的,但遇上毒丸消息就成了灾难。

我们需要一个DLQ机制。当一条消息处理失败达到预设次数后,就不再重试,而是将其转移到一个专门的llm:tasks:dlq流中,等待人工干预或特殊逻辑处理。

同时,我们还需要处理“僵尸消息”:一个消费者拉取了消息,但在处理完成(XACK)前就崩溃了。这条消息会一直处于“待处理”(Pending)状态。我们需要一个独立的清理进程,定期检查这些僵尸消息,并将它们重新分配给活跃的消费者。

// 僵尸消息清理逻辑 (可以放在另一个IHostedService中)
public async Task ReclaimStalePendingMessagesAsync(CancellationToken stoppingToken)
{
    var consumerName = $"reclaimer-{Environment.MachineName}";
    var staleThreshold = TimeSpan.FromMinutes(5); // 超过5分钟未ACK的消息视为僵尸消息

    while (!stoppingToken.IsCancellationRequested)
    {
        await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);

        try
        {
            var pendingInfo = await _database.StreamPendingAsync(StreamName, GroupName);
            if (pendingInfo.PendingMessageCount == 0) continue;

            // 获取详细的待处理消息列表
            var pendingMessages = await _database.StreamPendingMessagesAsync(
                StreamName, 
                GroupName, 
                100, // 每次处理100条
                RedisValue.Null, // 总是从头开始
                RedisValue.Null
            );

            foreach (var msg in pendingMessages)
            {
                if (msg.IdleTime > staleThreshold)
                {
                    _logger.Warning("Reclaiming stale message {MessageId} from consumer {Consumer}, idle for {IdleTime}", 
                        msg.MessageId, msg.ConsumerName, msg.IdleTime);
                    
                    // XCLAIM 将消息所有权转移给新的消费者
                    await _database.StreamClaimAsync(StreamName, GroupName, consumerName, (long)staleThreshold.TotalMilliseconds, new[] { msg.MessageId });
                    Datadog.Trace.DogStatsd.Increment("llm.tasks.reclaimed");
                }
            }
        }
        catch (Exception ex)
        {
            _logger.Error(ex, "Error during stale message reclaim process.");
        }
    }
}

处理失败并移入DLQ的逻辑:

// 在 ProcessEntry 的 catch 块中调用
private async Task HandleProcessingFailure(StreamEntry entry)
{
    const int maxRetries = 3;
    var messageId = entry.Id;

    // 检查此消息的投递次数
    var pendingMessages = await _database.StreamPendingMessagesAsync(StreamName, GroupName, 1, consumerName: RedisValue.Null, messageId: messageId, messageId: messageId);
    var deliveryCount = pendingMessages.FirstOrDefault()?.DeliveryCount ?? 1;

    if (deliveryCount > maxRetries)
    {
        _logger.Warning("Task {MessageId} failed {DeliveryCount} times, moving to DLQ.", messageId, deliveryCount);
        
        // 1. 将消息内容写入DLQ流
        await _database.StreamAddAsync(DeadLetterStreamName, entry.Values);
        
        // 2. 从原流中确认消息,以防再次投递
        await _database.StreamAcknowledgeAsync(StreamName, GroupName, messageId);

        Datadog.Trace.DogStatsd.Increment("llm.tasks.dlq.moved");
    }
    else
    {
        // 允许消息在超时后被自动重新投递
        _logger.Information("Task {MessageId} failed, will be retried. Attempt {AttemptCount}/{MaxRetries}", messageId, deliveryCount, maxRetries);
    }
}

Datadog:将所有信号关联起来

到目前为止,我们已经构建了一个健壮的管道,并发送了追踪、日志和指标。Datadog的魔力在于将它们无缝地连接起来。

当我们在Datadog APM中查看一条trace时,它会是这样的:

  1. aspnet_core.request (来自生产者API)
    • redis.command (XADD)
  2. llm.process_task (来自消费者服务)
    • llm.api.call (一个手动的内部span,包裹LLM客户端调用)
    • redis.command (XACK)

由于我们在生产者中注入了追踪上下文,并在消费者中续接了它,这两个看似独立的服务中的span会被自动链接成一个完整的分布式追踪。

更进一步,因为我们配置了日志注入,当你在Datadog中查看llm.process_task这个span时,侧边栏会显示出所有在这个span执行期间由消费者服务产生的日志。你不再需要根据时间戳和模糊的请求ID去手动关联日志和追踪。

最后,我们创建的自定义指标(llm.tasks.enqueued, llm.tasks.processed, llm.tasks.processing_time)可以用来创建仪表盘和告警。例如,我们可以设置一个告警:如果“成功处理的任务数”与“入队的任务数”之间的差距在5分钟内持续扩大,说明消费者可能出了问题,需要立即关注。或者,如果llm.tasks.processing_time的p99分位数突然升高,说明LLM服务可能存在性能瓶颈。

局限性与未来迭代路径

这个架构解决了最初的同步调用问题,并提供了相当不错的弹性和可观测性,但它并非完美。

首先,Redis本身虽然可以通过Sentinel或Cluster实现高可用,但它仍然是整个系统的核心瓶颈和单点故障源。对于需要跨地域容灾或对数据持久性有金融级要求的系统,可能需要考虑如Kafka这样的更重型的消息系统。

其次,消费者的伸缩目前是手动的。在生产环境中,我们希望消费者数量能根据队列的积压情况自动增减。这可以通过集成KEDA (Kubernetes Event-driven Autoscaling) 实现。KEDA有一个Redis Streams scaler,可以监控流的长度或消费者组的延迟,并据此自动调整消费者Deployment的副本数。

最后,可观测性可以更深入。我们目前追踪的是技术指标,下一步可以加入业务指标。例如,在任务消息中加入user_idtenant_id,然后在Datadog的指标和追踪中将它们作为标签。这样我们就能回答“哪个用户的请求处理最慢?”或“哪个租户消耗了最多的LLM资源?”这类问题,将技术监控提升到业务洞察的层面。


  目录