我寻找可以通过 n-TransformBlocks 链接到 JoinBlock 的替代方法,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类集合传递到另一个数据流块。
JoinBlock 可以很好地完成这项工作,但仅限于连接最多 3 个源块。它还存在很多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让从 TransformBlocks 返回的 Tasks 并等到所有 TransformBlocks 都有完成的任务才能在接受之前传递Task<item>
?
有什么替代的想法吗?在传递加入的项目集合之前,我可能有 1-20 个这样的转换块,我需要将哪些项目连接在一起。每个变换块保证为每个“变换”的输入项准确返回一个输出项。
编辑:要求澄清:
根据我之前的一个问题,我将 JoinBlocks 设置如下:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target1);
transformBlock2.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}