我认为这是非常基本的方法,但我还没有找到任何例子。我有一个生产者和一个消费者,我想在至少处理 x 个对象时完成管道。此外,我需要知道收到了哪些对象。
我就是这样做的:
public class BlockTester
{
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
_worker = new TransformBlock<int, int>(s => s + s);
var buffer = new BufferBlock<int>();
var consumeTask = Consume(buffer);
_worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});
foreach (var value in Enumerable.Range(0,100))
{
_worker.Post(value);
}
_worker.Complete();
await buffer.Completion;
if(buffer.TryReceiveAll(out var received))
{
Console.WriteLine(string.Join(", ", received));
}
}
public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
{
var received = new List<int>();
while (await buffer.OutputAvailableAsync())
{
var current = buffer.Receive();
received.Add(current);
if (current > 25)
{
_worker.Complete();
}
}
return received;
}
}
我对 buffer.TryReceiveAll 有点困惑。等待消费任务和 TryReceiveAll 有什么区别?为什么 TryReceiveAll 在我的场景中为假?我想我实现目标的方法仍然有问题。