2

我是TPL 数据流的新手,我正在寻找一种构造,它允许拆分源消息列表以进行均匀分布的并行处理,同时通过各个管道保持消息消息的顺序。DataFlow API 中是否有特定的块或概念可用于完成此任务,或者更多的是在现有块之间提供胶水代码或自定义块?

对于那些熟悉Akka.NET的人,我正在寻找类似于ConsistentHashing 路由器的功能,它允许将消息发送到单个路由器,然后将这些消息转发到要处理的单个路由。

同步示例:

var count = 100000;
var processingGroups = 5;
var source = Enumerable.Range(1, count);

// Distribute source elements consistently and evenly into a specified set of groups (ex. 5) so that.
var distributed = source.GroupBy(s => s % processingGroups);

// Within each of the 5 processing groups go through each item and add 1 to it
var transformed = distributed.Select(d => d.Select(i => i + 3).ToArray());

List<int[]> result = transformed.ToList();
Check.That(result.Count).IsEqualTo(processingGroups);
for (int i = 0; i < result.Count; i++)
{
    var outputGroup = result[i];

    var expectedRange = Enumerable.Range(i + 1, count/processingGroups).Select((e, index) => e + (index * (processingGroups - 1)) + 3);
    Check.That(outputGroup).ContainsExactly(expectedRange);
}
4

2 回答 2

1

一般来说,我不认为您正在寻找的东西是在 Dataflow 中预先制作的,因为它可能是使用 ConsistentHashing 路由器。但是,通过向您希望流动的数据片段添加 id,您可以以任何顺序并行处理它们,并在处理完成时重新排序它们。

public class Message {
        public int MessageId { get; set; }
        public int GroupId { get; set; }        
        public int Value { get; set; }
    }

    public class MessageProcessing
    {
        public void abc() {
            var count = 10000;
            var groups = 5;
            var source = Enumerable.Range(0, count);

            //buffer all input
            var buffer = new BufferBlock<IEnumerable<int>>();

            //split each input enumerable into processing groups
            var messsageProducer = new TransformManyBlock<IEnumerable<int>, Message>(ints => 
            ints.Select((i, index) => new Message() { MessageId = index, GroupId = index % groups, Value = i }).ToList());

            //process each message, one action block may process any group id in any order
            var processMessage = new TransformBlock<Message, Message>(msg => 
            {
                msg.Value++;
                return msg;
            }, new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = groups
            });

            //output of processed message values
            int[] output = new int[count];

            //insert messages into array in the order the started in
            var regroup = new ActionBlock<Message>(msg => output[msg.MessageId] = msg.Value, 
                new ExecutionDataflowBlockOptions() {
                    MaxDegreeOfParallelism = 1
                });
        }        

    }

在该示例中,未使用消息的 GroupId,但可以在更完整的示例中使用它来协调消息组。此外,可以通过将输出数组更改为 List 并在每次将可枚举的整数发布到缓冲区块时设置相应的列表元素来处理缓冲区块的后续帖子。根据您的具体用途,您可能需要支持输出的多个用户,这可以折叠回流程中。

于 2016-12-25T18:05:53.513 回答
0

您可以动态创建管道,并根据谓词在彼此之间链接块:

var count = 100;
var processingGroups = 5;
var source = Enumerable.Range(1, count);

var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {  });
var consumer2 = new ActionBlock<int>(i => {  });
var consumer3 = new ActionBlock<int>(i => {  });
var consumer4 = new ActionBlock<int>(i => { Console.WriteLine(i); });
var consumer5 = new ActionBlock<int>(i => {  });

buffer.LinkTo(consumer1, i => i % 5 == 1);
buffer.LinkTo(consumer2, i => i % 5 == 2);
buffer.LinkTo(consumer3, i => i % 5 == 3);
buffer.LinkTo(consumer4, i => i % 5 == 4);
buffer.LinkTo(consumer5);

foreach (var i in source)
{
    buffer.Post(i);
    // consider async option if you able to do it
    // await buffer.SendAsync(i);
}
buffer.Complete();
Console.ReadLine();

上面的代码将只写入第 4 组的数字,默默地处理其他组,但我希望你明白了。通常的做法是为至少一个消费者链接一个块,而不过滤未被任何消费者接受的消息,如果它们没有被任何消费者接受,则可以这样做(NullTarget<int>只需忽略所有它收到的消息):

buffer.LinkTo(DataflowBlock.NullTarget<int>());

这样做的缺点是它的优点的延续:您必须提供谓词,因为没有内置结构。但是,它仍然可以完成。

于 2017-01-26T17:14:19.450 回答