我面临以下问题:
我有一个对象数据流,Foo
并将这些对象流式传输到几个并发的进程内任务/线程,这些任务/线程依次处理对象和输出FooResult
对象。每个都FooResult
包含在其他成员Foo
中与创建FooResult
. 但是,并非每个人都Foo
必须创建一个FooResult
.
我的问题是我想从整个过程中传递一个包装对象,该对象包含原始对象和可能从并发任务中创建的Foo
所有对象(如果有的话) 。FooResult
Foo
注意:我目前使用 TPL 数据流,而每个并发进程都发生在ActionBlock<Foo>
从BroadCastBlock<Foo>
. 它用于SendAsync()
向目标数据流块发送可能创建FooResult
的 . 显然,并发数据流块FooResult
在不可预测的时间产生,这是我目前正在努力解决的问题。我似乎无法弄清楚FooResult
总共创建了多少个ActionBlock<Foo>
,以便我可以将它们与原始对象捆绑在一起Foo
并将其作为包装对象传递。
在伪代码中,它目前看起来如下:
BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2;
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);
aBlock1 = new ActionBlock<Foo>(foo =>
{
//do something here. Sometimes create a FooResult. If then
targetBlock.SendAsync(fooResult);
});
//similar for aBlock2
Foo
但是,当前代码的问题在于,如果 a没有FooResult
在任何操作块中生成单个,则 targetBlock 可能不会收到任何内容。此外,可能 targetBlock 接收 2 个FooResult
对象,因为每个动作块产生一个FooResult
.
我想要的是 targetBlock 接收一个包含每个对象的包装对象,Foo
如果FooResult
创建了对象,那么也是一个FooResult
.
有什么想法可以使解决方案按照描述的方式工作吗?它不必仔细阅读 TPL 数据流,但如果这样做会很整洁。
更新:以下是我通过 svick 建议的 JoinBlock 实现得到的。我不会使用它(除非它可以在性能方面进行调整),因为它运行起来非常慢,我每秒可以处理大约 89000 个项目(而且那只是 int 值类型)。
public class Test
{
private BroadcastBlock<int> broadCastBlock;
private TransformBlock<int, int> transformBlock1;
private TransformBlock<int, int> transformBlock2;
private JoinBlock<int, int, int> joinBlock;
private ActionBlock<Tuple<int, int, int>> processorBlock;
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
joinBlock = new JoinBlock<int, int, int>();
processorBlock = new ActionBlock<Tuple<int, int, int>>(tuple =>
{
//Console.WriteLine("original value: " + tuple.Item1 + "tfb1: " + tuple.Item2 + "tfb2: " + tuple.Item3);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(joinBlock.Target1, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(joinBlock.Target3, new DataflowLinkOptions { PropagateCompletion = true });
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
}
更新代码以反映建议:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target1);
transformBlock2.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
}