1

编辑:按照 svick 的建议,我用一个简单的 TransformBlock 替换了自定义 IPropagatorBlock,但是,我仍然看到输入项的顺序和输出项的顺序不匹配。在我传入的 TransformBlock 实例化和 Func 下方:

quoteBuffer = new TransformBlock<Tuple<Symbol, int>, List<Quote>>(syncExecution, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true,  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

//Function that performs Sync Processing
Func<Tuple<Symbol, int>, List<Quote>> syncExecution = new Func<Tuple<Symbol, int>, List<Quote>>(partitionTuple =>
{
    Symbol symbol = partitionTuple.Item1;
    int partitionIndex = partitionTuple.Item2;

    //Read Binary Data
    byte[] byteArray = binaryDataReaders[symbol].ReadBytes(partitionIndex);

    //Deserialize and return quote list
    List<Quote> quoteList = dataInterfaces[symbol].Deserialize(symbol, byteArray);

    return quoteList;
});

这就是我发布到转换块的方式:

quoteBuffer.SendAsync(new Tuple<Symbol, int>(symbol, counter));

原始问题:

有人帮助我使用以下自定义转换块。这个想法是发布/发送 TInput 并以异步方式对 TInput 进行操作,而自定义转换块在返回转换后的项目时保留发布项目的顺序。

例如,如果以相应的顺序发布 1、2、3,并且变换函数对每个输入进行平方并返回项目,则正确的输出值和顺序应该是 1、4、9,而不管 3 个变换操作中的哪一个在何时完成.

但是,我怀疑代码有错误,因为输出顺序不正确。更糟糕的是,混乱的订单位置是随机的,这使得调试变得更加困难,但这反映了一个事实,即显然将输入元素转换为输出元素的任务总是以不同的方式完成。

有人可以看看并可能给出一些我在这里缺少的提示吗?谢谢

public static IPropagatorBlock<TInput, TOutput> CreateConcurrentOrderedTransformBlock<TInput, TOutput>(Func<TInput, TOutput> transform)
    {
        var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

        var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
            tuple => tuple.Item2(transform(tuple.Item1)),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        var enqueuer = new ActionBlock<TInput>(
            async item =>
            {
                var tcs = new TaskCompletionSource<TOutput>();
                await processor.SendAsync(
                    new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
                await queue.SendAsync(tcs.Task);
            });

        enqueuer.Completion.ContinueWith(
            _ =>
            {
                queue.Complete();
                processor.Complete();
            });

        return DataflowBlock.Encapsulate(enqueuer, queue);
    }
4

1 回答 1

0

我回答了我自己的问题,因为我发现了导致所有这些麻烦的错误。从我的 lambda 表达式可以看出,我在数据块中读取字节数组,这意味着只要并行度设置为 >1,就会同时从物理磁盘的同一文件中读取字节数组。这显然真的与读取字节的位置混淆了。我使用 br.basestream.seek(...) 设置读取操作的起点,并通过 br.readbytes(numberBytes) 读取字节。由于多个操作同时影响文件中的位置,二进制读取器很可能以无序方式读取字节,从而导致混乱。

我通过将二进制读取器从 lambda 表达式中提取出来解决了这个问题,而是将读取的字节数组传递到表达式中,并且仅将并发用于解决问题的反序列化和合并/排序目的。是的,变换块保留了顺序。感谢 svick 分享您在 tpl 数据流方面的丰富专业知识。

于 2012-10-12T19:07:13.030 回答