0

我正在尝试异步和并行模拟两个集合之间的工作,我有一个 ConcurrentQueue 的客户和一个工人的集合。我需要工作人员从队列中带走一个客户对客户执行工作,一旦完成就立即带走另一个客户。

我决定使用基于事件的范例,其中工人集合将对客户执行操作;谁拥有一个在客户完成后触发的事件处理程序;这有望再次触发 DoWork 方法,这样我就可以并行化工作人员以从队列中获取客户。但我不知道如何在 OnCustomerFinished() 中将客户传递给 DoWork!工人显然不应该依赖客户队列

public class Worker
{
    public async Task DoWork(ConcurrentQueue<Customer> cust)
    {
        await Task.Run(() =>
        {
            if (cust.TryDequeue(out Customer temp))
            {
                Task.Delay(5000);
                temp.IsDone = true;
            }
        });
    }

    public void OnCustomerFinished()
    {
        // This is where I'm stuck
        DoWork(~HOW TO PASS THE QUEUE OF CUSTOMER HERE?~);
    }
}

// 编辑 - 这是客户类

 public class Customer
{
    private bool _isDone = false;

    public EventHandler<EventArgs> CustomerFinished;

    public bool IsDone
    {
        private get { return _isDone; }
        set
        {
            _isDone = value;
            if (_isDone)
            {
                OnCustomerFinished();
            }

        }
    }
    protected virtual void OnCustomerFinished()
    {
        if (CustomerFinished != null)
        {
            CustomerFinished(this, EventArgs.Empty);
        }
    }
}
4

1 回答 1

4

.NET 已经拥有 DataFlow 块形式的 pub/sub 和 worker 机制,最近还有 Channels。

数据流

System.Threading.Tasks.Dataflow命名空间中的数据流块是构建工作人员和工作人员管道的“旧”方式(2012 年及以后)。每个块都有一个输入和/或输出缓冲区。发布到块的每条消息都由后台的一个或多个任务处理。对于具有输出的块,每次迭代的输出都存储在输出缓冲区中。

块可以组合成类似于 CMD 或 Powershell 管道的管道,每个块在其自己的任务上运行。

在最简单的情况下,ActionBlock可以用作工作者:

void ProcessCustomer(Customer customer)
{
    ....
}

var block =new ActionBlock<Customer>(cust=>ProcessCustomer(cust));

而已。无需手动出队或轮询。

生产者方法可以开始将客户实例发送到块。它们中的每一个都将按照发布的顺序在后台进行处理:

foreach(var customer in bigCustomerList)
{
    block.Post(customer);
}

完成后,例如当应用程序终止时,生产者只需调用Complete()块并等待任何剩余条目完成。

block.Complete();
await block.Completion;

块也可以使用异步方法。

频道

通道是一种新机制,内置于 .NET Core 3 中,可在以前的 .NET Framework 和 .NET Core 版本中作为NuGet使用。生产者使用 ChannelWriter 写入通道,消费者使用 ChannelReader 从通道读取。在您意识到它允许一些强大的模式之前,这可能看起来有点奇怪。

生产者可能是这样的,例如,生产者以 0.5 秒的延迟“生产”列表中的所有客户:

ChannelReader<Customer> Producer(IEnumerable<Customer> customers,CancellationToken token=default)
{
    //Create a channel that can buffer an infinite number of entries
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;
    //Start a background task to produce the data
    _ = Task.Run(async ()=>{
        foreach(var customer in customers)
        {
            //Exit gracefully in case of cancellation
            if (token.IsCancellationRequested)
            {
                return;
            }
            await writer.WriteAsync(customer,token);
            await Task.Delay(500);
        }
    },token)
         //Ensure we complete the writer no matter what
         .ContinueWith(t=>writer.Complete(t.Exception);

   return channel.Reader;
}

这有点复杂,但请注意该函数唯一需要返回的是 ChannelReader。取消令牌对于提前终止生产者很有用,例如在超时之后或应用程序关闭时。

当作者完成时,频道的所有读者也将完成。

消费者只需要 ChannelReader 工作:

async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
    while(await reader.WaitToReadAsync(token))
    {
       while(reader.TryRead(out var customer))
       {
           //Process the customer
       }
    }
}

如果编写器完成,WaitToReadAsync将返回false并且循环将退出。

在 .NET Core 3 中,ChannelReader 通过 ReadAllAsync 方法支持 IAsyncEnumerable,使代码更加简单:

async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
{
    await foreach(var customer in reader.ReadAllAsync(token))
    {
           //Process the customer
    }
}

生产者创建的阅读器可以直接传递给消费者:

var customers=new []{......}
var reader=Producer(customers);
await Consumer(reader);

中间步骤可以从前一个通道阅读器读取数据并将数据发布到下一个,例如订单生成器:

ChannelReader<Order> ConsumerOrders(ChannelReader<Customer> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;
    //Start a background task to produce the data
    _ = Task.Run(async ()=>{
        await foreach(var customer in reader.ReadAllAsync(token))
        {
           //Somehow create an order for the customer
           var order=new Order(...);
           await writer.WriteAsync(order,token);
        }
    },token)
         //Ensure we complete the writer no matter what
         .ContinueWith(t=>writer.Complete(t.Exception);

   return channel.Reader;
}

同样,我们需要做的就是将读者从一种方法传递到另一种方法

var customers=new []{......}
var customerReader=Producer(customers);
var orderReader=CustomerOrders(customerReader);
await ConsumeOrders(orderReader);
于 2019-09-13T13:26:43.967 回答