5

我面临以下问题:

我有一个对象数据流,Foo并将这些对象流式传输到几个并发的进程内任务/线程,这些任务/线程依次处理对象和输出FooResult对象。每个都FooResult包含在其他成员Foo中与创建FooResult. 但是,并非每个人都Foo必须创建一个FooResult.

我的问题是我想从整个过程中传递一个包装对象,该对象包含原始对象和可能从并发任务中创建的Foo所有对象(如果有的话) 。FooResultFoo

注意:我目前使用 TPL 数据流,而每个并发进程都发生在ActionBlock<Foo>BroadCastBlock<Foo>. 它用于SendAsync()向目标数据流块发送可能创建FooResult的 . 显然,并发数据流块FooResult在不可预测的时间产生,这是我目前正在努力解决的问题。我似乎无法弄清楚FooResult总共创建了多少个ActionBlock<Foo>,以便我可以将它们与原始对象捆绑在一起Foo并将其作为包装对象传递。

在伪代码中,它目前看起来如下:

BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2; 
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);

aBlock1 = new ActionBlock<Foo>(foo =>
{
    //do something here. Sometimes create a FooResult. If then
    targetBlock.SendAsync(fooResult);
});

//similar for aBlock2

Foo但是,当前代码的问题在于,如果 a没有FooResult在任何操作块中生成单个,则 targetBlock 可能不会收到任何内容。此外,可能 targetBlock 接收 2 个FooResult对象,因为每个动作块产生一个FooResult.

我想要的是 targetBlock 接收一个包含每个对象的包装对象,Foo如果FooResult创建了对象,那么也是一个FooResult.

有什么想法可以使解决方案按照描述的方式工作吗?它不必仔细阅读 TPL 数据流,但如果这样做会很整洁。

更新:以下是我通过 svick 建议的 JoinBlock 实现得到的。我不会使用它(除非它可以在性能方面进行调整),因为它运行起来非常慢,我每秒可以处理大约 89000 个项目(而且那只是 int 值类型)。

public class Test
{
    private BroadcastBlock<int> broadCastBlock;
    private TransformBlock<int, int> transformBlock1;
    private TransformBlock<int, int> transformBlock2;
    private JoinBlock<int, int, int> joinBlock;
    private ActionBlock<Tuple<int, int, int>> processorBlock;

    public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, int>(i =>
            {
                return i;
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        transformBlock2 = new TransformBlock<int, int>(i =>
            {
                return i;
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        joinBlock = new JoinBlock<int, int, int>();

        processorBlock = new ActionBlock<Tuple<int, int, int>>(tuple =>
            {
                //Console.WriteLine("original value: " + tuple.Item1 + "tfb1: " + tuple.Item2 + "tfb2: " + tuple.Item3);
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });

        broadCastBlock.LinkTo(joinBlock.Target1, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(joinBlock.Target2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(joinBlock.Target3, new DataflowLinkOptions { PropagateCompletion = true });

        joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();

        const int numElements = 1000000;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.Post(i);
        }

        ////mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
        Console.ReadLine();
    }
}

更新代码以反映建议:

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, int>(i =>
            {
                return i;
            });

        transformBlock2 = new TransformBlock<int, int>(i =>
            {
                return i;
            });

        joinBlock = new JoinBlock<int, int>();

        processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
            {
                //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(joinBlock.Target1);
        transformBlock2.LinkTo(joinBlock.Target2);
        joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();

        const int numElements = 1000000;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.Post(i);
        }

        ////mark completion
        broadCastBlock.Complete();
        Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


        processorBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
        Console.ReadLine();
    }
}
4

2 回答 2

3

我可以看到两种方法来解决这个问题:

  1. 使用JoinBlock. 您的广播块和两个工作块将分别发送到连接块的一个目标。如果一个工作块没有任何结果,它将null代替它(或其他一些特殊值)。您的工作块将需要更改为TranformBlock<Foo, FooResult>,因为使用ActionBlock您所做的方式并不能保证顺序(至少在您设置时不是MaxDegreeOfParallelism),TransformBlock确实如此。

    的结果JoinBlock将是 a Tuple<Foo, FooResult, FooResult>,其中任何一个或两个FooResults 都可以是null

    虽然我不确定我是否喜欢这个解决方案在很大程度上依赖于正确的项目排序,但这对我来说似乎很脆弱。

  2. 使用其他对象进行同步。当所有块都完成某个项目时,该对象将负责向前发送结果。这类似于NotificationWrapper马里奥在他的回答中提出的建议。

    在这种情况下,您可以使用TaskCompletionSourceandTask.WhenAll()来处理同步。

于 2012-11-22T22:47:16.503 回答
2

据我了解这个问题:

lock foo
work on foo
if foo has not triggered sending a result
and fooResult exists
   send fooResult
   remember in foo that result has already been sent
unlock foo

OP评论后更新

所以将 foo 推入您的广播块

BroadCastBlock<Foo> bcb = new BroadCastBlock<Foo>(foo);
...

if ( aBlock1.HasResult ) 
{
    bcb.Add( aBlock1.Result );
}

if ( aBlock2.HasResult ) 
{
    bcb.Add( aBlock2.Result );
}

现在您可以查询 bcb 的内容并发送需要的内容(或仅发送 bcb)。

更新(在评论中进行更多讨论之后)

class NotificationWrapper<TSource, TResult>
{
   private readonly TSource originalSource;

   private Queue<TResult> resultsGenerated = new Queue<TResult>()

   private int workerCount = 0;

   public NotificationWrapper<TSource, TResult>( TSource originalSource, int workerCount )
   {
       this.originalSource = originalSource;
       this.workerCount = workerCount;
   }

   public void NotifyActionDone()
   {
       lock( this )
       {
          --workerCount;
          if ( 0 == workerCount )
          {
             //do my sending
             send( originalSource, resultsGenerated );
          }
       }
   }

    public void NotifyActionDone( TResult result )
    {
        lock ( this )
        {
            resultsGenerated.push( result );
            NotifyActionDone();
        }
    }
}

在调用代码中:

NotificationWrapper<Foo, Fooresult> notificationWrapper = new NotificationWrapper<Foo, Fooresult>( foo, 2 );
ActionBlock<Foo> ab1 = new ActionBlock<Foo>( foo, notificationWrapper );
ActionBlock<Foo> ab2 = new ActionBlock<Foo>( foo, notificationWrapper );

并且 ActionBlock 需要更改为调用NotifyActionDone()NotifyActoinDone( Fooresult )完成计算后。

于 2012-11-21T16:15:24.753 回答