0

我认为这是非常基本的方法,但我还没有找到任何例子。我有一个生产者和一个消费者,我想在至少处理 x 个对象时完成管道。此外,我需要知道收到了哪些对象。

我就是这样做的:

public class BlockTester
{
    private static TransformBlock<int, int> _worker;

    public static async Task StartAsync()
    {
        _worker = new TransformBlock<int, int>(s => s + s);
        var buffer = new BufferBlock<int>();
        var consumeTask = Consume(buffer);

        _worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});

        foreach (var value in Enumerable.Range(0,100))
        {
            _worker.Post(value);
        }

        _worker.Complete();

        await buffer.Completion;

        if(buffer.TryReceiveAll(out var received))
        {
            Console.WriteLine(string.Join(", ", received));
        }
    }

    public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
    {
        var received = new List<int>();

        while (await buffer.OutputAvailableAsync())
        {
            var current = buffer.Receive();

            received.Add(current);

            if (current > 25)
            {
                _worker.Complete();
            }
        }

        return received;
    }
}

我对 buffer.TryReceiveAll 有点困惑。等待消费任务和 TryReceiveAll 有什么区别?为什么 TryReceiveAll 在我的场景中为假?我想我实现目标的方法仍然有问题。

4

1 回答 1

2

你的Consume方法应该是一个ActionBlock. 没有必要使用OutputAvailableAsyncor TryRecieveAll。将 替换为BufferBlockanActionBlock并在ActionBlock. TransformBlock除非您在此过程中有多个步骤,否则尚不清楚为什么您需要其中一个。

public class BlockTester
{
    //Could be removed
    private static TransformBlock<int, int> _worker;

    public static async Task StartAsync()
    {
        //Could be removed
        _worker = new TransformBlock<int, int>(s => s + s);
        var processor = new ActionBlock<int>(x => ProcessMessage(x));

        _worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });

        foreach (var value in Enumerable.Range(0, 100))
        {
            _worker.Post(value);
        }

        //_worker.Complete();

        await processor.Completion;
    }


    private static int itemsRecieved = 0;
    public static void ProcessMessage(int x)
    {
        Interlocked.Increment(ref itemsRecieved);
        if (itemsRecieved > 25) _worker.Complete();
        //process the message
        //log the message etc.
    }
}

或者使用复杂的消息对象:

public class Message { }

public class BlockTester
{
    //Could be removed
    private static TransformBlock<Message, Message> _worker;

    public static async Task StartAsync()
    {
        //Could be removed
        _worker = new TransformBlock<Message, Message>(s => s);
        var processor = new ActionBlock<Message>(x => ProcessMessage(x));

        _worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });

        foreach (var value in Enumerable.Range(0, 100).Select(_ => new Message()))
        {
            _worker.Post(value);
        }

        //_worker.Complete();

        await processor.Completion;
    }


    private static ConcurrentBag<Message> itemsRecieved = new ConcurrentBag<Message>();
    public static void ProcessMessage(Message x)
    {
        itemsRecieved.Add(x);
        if (itemsRecieved.Count > 25) _worker.Complete();
        //process the message
        //log the message etc.
    }
}

编辑 回答原始问题:

为什么TryReceiveAll返回false:

因为到TryReceiveAll运行的时候BufferBlock已经“完成”了。对于要完成的块,它的输出缓冲区中必须包含 0 个项目。该Consume方法是在允许完成块之前将所有项目拉出,您最终会调用TryRecieveAll一个空块。

于 2018-03-08T21:47:53.100 回答