-1

在下面的测试代码中,我期待这个结果:

1, 2000
2, 4000
3, 6000

然而实际结果是:

3, 6000    
2, 4000
1, 2000

此外,我只能在 6 秒后在屏幕上看到结果。这意味着任何被竞争的输入都在等待并处理到下一阶段。

如何使管道在完成后立即吐出每个输入的结果?

    public static void Run()
    {
        Func<int, string> fn = n =>
        {
            var sleep = n * 2000;
            Thread.Sleep(sleep);
            return n + ", " + sleep;
        };

        var opts = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4
        };

        var transformBlock = new TransformBlock<int, string>(fn, opts);
        var bufferBlock = new BufferBlock<string>(opts);

        transformBlock.LinkTo(bufferBlock, new DataflowLinkOptions { PropagateCompletion = true });

        for (var i = 3; i > 0; i--)
            transformBlock.Post(i);

        Console.WriteLine(bufferBlock.Receive());
        Console.WriteLine(bufferBlock.Receive());
        Console.WriteLine(bufferBlock.Receive());
    }
4

1 回答 1

5

默认情况下,数据流会保留消息的顺序,即使它们是并行处理的。

要尽快转换消息,即乱序,EnsureOrderedfalse在您的TransformBlock.

请务必使用最新版本的数据流(目前 nuget 包System.Threading.Tasks.Dataflow存在于版本 4.9 中),因为EnsureOrdered并非总是如此。

例子:

class Program
{
    static void Main( string[] args )
    {
        var transformBlock = new TransformBlock<int, int>( x =>
        {
            Thread.Sleep( x );
            return x;
        }, new ExecutionDataflowBlockOptions {EnsureOrdered = false, MaxDegreeOfParallelism = 10} );
        var actionBlock = new ActionBlock<int>( x => Console.WriteLine( x ) );
        transformBlock.LinkTo( actionBlock, new DataflowLinkOptions {PropagateCompletion = true});
        for( var i = 9; i > 0; i-- )
            transformBlock.Post( i * 1000 );
        transformBlock.Complete();
        actionBlock.Completion.Wait();
        Console.ReadLine();
    }
}

这输出

1000
2000
3000
4000
5000
6000
7000
8000
9000
于 2018-10-10T08:02:04.590 回答