c# 如何用 Channel 实现一个批处理(Batching)的后台服务

直接用 Channel 做批处理易丢数据,因其无“凑够N条”或“超时提交”语义;需封装定时器+批量读取逻辑,并确保 FlushBatchAsync 支持取消、不吞异常、清空列表,且生命周期与 IHostedService 对齐。

为什么直接用 Channel 做批处理容易丢数据

因为 Channel 本身不提供“等凑够 N 条再发”或“超时强制提交”的语义。你如果只靠 Channel.Reader.ReadAsync() 一条条读,就退化成单条处理;如果自己加循环 TryRead 拼批次,又得手动管超时、取消、边界条件——稍不留神,Writer 关闭时未读完的数据就丢了,或者批次卡住不触发。

核心矛盾在于:Channel 是流式传输原语,不是批处理原语。必须在它之上封装一层协调逻辑。

Channel + Timer 实现可靠批处理的关键点

推荐用一个后台 Task 持续从 Channel.Reader 尝试批量读取,同时用 System.Threading.Timer 触发“兜底提交”。注意三点:

  • Timer 的回调必须是线程安全的,且不能阻塞(比如别在里面 await 或调 long-running 方法)
  • 每次读取前检查 Reader.Completion.IsCompleted,避免在 Channel 关闭后还尝试读
  • 批次收集必须用 List 而非数组,且每次提交后要清空,不能复用引用(否则并发写会乱)
private async Task BatchingLoopAsync(CancellationToken ct)
{
    var batch = new List();
    var timer = new Timer(_ => { _ = FlushBatchAsync(batch, ct); }, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);

    try
    {
        while (!ct.IsCancellationRequested && await _channel.Reader.WaitToReadAsync(ct).ConfigureAwait(false))
        {
            while (_channel.Reader.TryRead(out var item))
            {
                batch.Add(item);
                if (batch.Count >= _batchSize)
                {
                    await FlushBatchAsync(batch, ct).ConfigureAwait(false);
                    batch.Clear();
                    timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
                }
            }

            // 每次有新数据进来,重置定时器(实现“最后一条进来后等 flushInterval 再提交”)
            timer.Change(_flushInterval, Timeout.InfiniteTimeSpan);
        }
    }
    finally
    {
        timer.Dispose();
        if (batch.Count > 0)
            await FlushBatchAsync(batch, ct).ConfigureAwait(false);
    }
}

FlushBatchAsync 必须支持取消且不能吞异常

这是最容易出问题的一环:如果 FlushBatchAsync 里调用的是外部 HTTP API 或数据库写入,它可能耗时、可能失败、可能被取消。必须显式传递 CancellationToken,并在 catch 块中区分 OperationCanceledException 和其他异常。

  • 遇到 OperationCanceledException:直接 return,不要重试,因为上层已要求停止
  • 遇到其他异常:记录日志,但不要 throw —— 否则整个 BatchingLoopAsync 会退出,后续数据全丢
  • 如果需要重试,应在 FlushBatchAsync 内部做(比如用 Polly),而不是让外层循环崩溃
private async Task FlushBatchAsync(List batch, CancellationToken ct)
{
    try
    {
        await _httpClient.PostAsJsonAsync("/api/batch", batch, ct).ConfigureAwait(false);
    }
    catch (OperationCanceledException) when (ct.IsCancellationRequested)
    {
        // 正常退出路径,不记日志
        return;
    }
    catch (Exception ex) when (!(ex is OperationCanceledException))
    {
        _logger.LogError(ex, "Failed to flush batch of {Count} items", batch.Count);
        // 不 throw,继续下一轮
    }
    finally
    {
        batch.Clear(); // 确保清空,避免引用残留
    }
}

注册为 IHostedService 时要注意生命周期绑定

Channel 的 WriterReader 都需要和宿主生命周期对齐。常见错误是把 Channel.CreateBounded() 放在构造函数里,但没在 StopAsync 中显式调用 Writer.Complete(),导致 BatchingLoopAsync 永远等在 WaitToReadAsync 上,服务无法正常退出。

  • StartAsync 中启动 BatchingLoopAsync 并用 Task.RunBackgroundService 托管
  • StopAsync 中先调 _channel.Writer.Complete(),再 await _batchingTask 等它自然结束
  • 别在 Dispose 里做任何异步清理——IHostedService 的 Dispose 是同步的

真正难的是边界情况:比如 StopAsync 被调用时,FlushBatchAsync 正在发请求,这时 CancelToken 触发,你得确保那个 HTTP 请求真能被取消(HttpClient 默认支持),而不是留下悬挂连接。