1

我有一个线程负责入队,一个线程负责出队。但是,数据入队的频率远远超过出队+处理数据所需的时间。当我执行以下操作时,我最终在数据处理中遇到了巨大的延迟:

public void HandleData()
{
    while (true)
    {
        try
        {
            if (Queue.Count > 0)
            {
                Queue.TryDequeue(out item);
                ProcessData(item);
            }
            else
            {
                Thread.Sleep(10);
            }
        }
        catch (Exception e)
        {
            //...
        }
    }
}

接下来我尝试在单独的任务中处理数据,但这最终影响了项目中的其他任务,因为这种处理最终占用了分配给应用程序的大部分资源并产生了高线程数。

public void HandleData()
{
    while (true)
    {
        try
        {
            if (Queue.Count > 0)
            {
                Queue.TryDequeue(out item);
                Task.Run(() => ProcessData(item));
            }
            else
            {
                Thread.Sleep(10);
            }
        }
        catch (Exception e)
        {
            //
        }
    }
}

接下来,我尝试了以下方法:

public void HandleData()
{
    List<Task> taskList = new List<Task>();
    while (true)
    {
        try
        {
            if (Queue.Count > 0)
            {
                Queue.TryDequeue(out item);
                if (taskList.Count <= 20)
                {
                    Task t = Task.Run(() => ProcessData(item));
                    taskList.Add(t);
                }
                else
                {
                    ProcessData(item);
                }
            }
            else
            {
                Thread.Sleep(10);
            }
            taskList.RemoveAll(x => x.IsCompleted);
        }
        catch (Exception e)
        {
            //...
        }
    }
}

这似乎已经解决了这个问题,但我想知道是否有更清洁的方法来做到这一点?出队时设置最大并发线程数的方法?

4

2 回答 2

1

ConcurrentQueue不是合适的容器,特别是因为它不提供异步操作。更好的选择是使用ActionBlockChannel结合Parallel.ForEachAsync

使用动作块

ActionBlock 结合了输入队列和工作线程来异步处理数据。工作人员会在数据可用时立即对其进行处理。使用 ActionBlock,您可以创建具有一定数量工作人员的块并开始向其发布数据。该块将仅使用配置数量的工作任务来处理数据:


ActionBlock<Data> _block;

public void Initialize()
{
    var options=new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 20
    }

    _block =new ActionBlock(ProcessData,options);

}

使用PostSendAsync方法将数据/消息发布到块。当没有更多数据时,Complete方法会告诉块在处理完任何待处理的项目后关闭。我们可以通过等待Completion属性来等待待处理的项目完成

public async Task Produce(CancellationToken cancel)
{
    while(!cancel.IsCancellationRequested)
    {
        var data=ProduceSomething();
        _block.Post(data);
    }
    _block.Complete();

    await _block.Completion;
}

使用频道

另一种选择是使用 aChannel而不是 a ConcurrentQueue。这个类相当于一个异步的 ConcurrentQueue,它提供了一个IAsyncEnumerable<T>可以用await foreach迭代的。您可以创建特定数量的工作人员以从容器本身或通过IAsyncEnumerable<T>流读取。Parallel.ForEachAsync在 .NET 6 中,最后一部分使用固定并行度选项更容易:


ChannelReader<T> Producer(CancellationToken token)
{
    var channel=Channel.CreateUnbounded<T>();

    var writer=channel.Writer;

    while(!token.IsCancellationRequested)
    {
        var someDate=ProduceData();
        channel.WriteAsync(someData);
    }
    writer.Complete();
    
    return channel.Reader;
}    


async Task Consumer<T>(ChannelReader<T> input,int dop=20)
{
    ParallelOptions parallelOptions = new()
    {
        MaxDegreeOfParallelism = dop
    };
 
    await Parallel.ForEachAsync(input.ReadAllAsync(), options,
                                data=>ProcessData(data));
}

于 2022-02-17T11:19:27.890 回答
1

ActionBlock<T>更简洁的解决方案是使用TPL Dataflow库,它是 .NET 标准库(从 .NET Core 开始)的一部分。该组件对其接收到的每个项目调用一个操作。它在内部有自己的输入队列,默认情况下是无界的,并使用可配置的来处理项目,MaxDegreeOfParallelism默认情况下为 1。这是您如何使用它的方法。首先创建块:

var block = new ActionBlock<Item>(item =>
{
    ProcessData(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded // Configurable
});

然后你可以在任何你喜欢的时候用工作来喂它:

block.Post(new Item());

最后,当您即将终止程序时,将块标记为已完成,然后await(或Wait)将块标记为完成以干净终止:

block.Complete();
await block.Completion;

注意:如果ProcessData失败一次,ActionBlock<T>将停止接受更多项目(该Post方法将返回false而不是返回true)。如果您希望它是防故障的,您应该并处理内部(或调用)内部的catch所有错误。ProcessDataAction<T>ProcessData

于 2022-02-17T11:29:50.970 回答