我认为你应该阅读
MSDN:如何实现生产者/消费者数据流模式
我遇到了同样的问题:一个生产者生产商品,而几个消费者消费了它们并决定将它们发送给其他消费者。每个消费者都在异步工作,并且独立于其他消费者。
你的主要任务是生产者。他生产您的其他任务应该处理的项目。带有主要任务代码的类有一个功能:
public async Task ProduceOutputAsync(...)
您的主程序使用以下命令启动此任务:
var producerTask = Task.Run( () => MyProducer.ProduceOutputAsync(...)
一旦这被调用,生产者任务开始产生输出。同时你的主程序可以继续做其他事情,比如启动消费者。
但让我们首先关注 Producer 任务。
生产者任务产生类型为 T 的项目以供其他任务处理。使用实现 ITargetBlock 的对象将它们转移到其他任务。
每次生产者任务完成创建类型 T 的对象时,它都会使用 ITargetBlock.Post 或最好是异步版本将其发送到目标块:
while (continueProducing())
{
T product = await CreateProduct(...)
bool accepted = await this.TargetBlock(product)
// process the return value
}
// if here, nothing to produce anymore. Notify the consumers:
this.TargetBlock.Complete();
生产者需要一个 ITargetBlock <T
>。在我的应用程序中,一个 BufferBlock <T
> 就足够了。检查 MSDN 以了解其他可能的目标。
无论如何,数据流块也应该实现 ISourceBlock <T
>。您的接收器等待输入到达源,获取并处理它。完成后,它可以将结果发送到自己的目标块,并等待下一个输入,直到不再有预期的输入。当然,如果您的消费者不产生输出,它就不必向目标发送任何内容。
等待输入的过程如下:
ISourceBlock`<T`> mySource = ...;
while (await mySource.ReceiveAsync())
{ // a object of type T is available at the source
T objectToProcess = await mySource.ReceiveAsync();
// keep in mind that someone else might have fetched your object
// so only process it if you've got it.
if (objectToProcess != null)
{
await ProcessAsync(objectToProcess);
// if your processing produces output send the output to your target:
var myOutput = await ProduceOutput(objectToprocess);
await myTarget.SendAsync(myOutput);
}
}
// if here, no input expected anymore, notify my consumers:
myTarget.Complete();
- 构建你的生产者
- 构建所有消费者
- 给生产者一个 BufferBlock 以将其输出发送到
- 启动生产者 MyProducer.ProduceOutputAsync(...)
- 当生产者产生输出并将其发送到缓冲区块时:
- 给消费者相同的 BufferBlock
- 将消费者作为单独的任务启动
- await Task.WhenAll(...) 等待所有任务完成。
每个消费者将在听到不再需要输入时立即停止。完成所有任务后,您的 main 函数可以读取结果并返回