编辑:按照 svick 的建议,我用一个简单的 TransformBlock 替换了自定义 IPropagatorBlock,但是,我仍然看到输入项的顺序和输出项的顺序不匹配。在我传入的 TransformBlock 实例化和 Func 下方:
quoteBuffer = new TransformBlock<Tuple<Symbol, int>, List<Quote>>(syncExecution, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true, MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
//Function that performs Sync Processing
Func<Tuple<Symbol, int>, List<Quote>> syncExecution = new Func<Tuple<Symbol, int>, List<Quote>>(partitionTuple =>
{
Symbol symbol = partitionTuple.Item1;
int partitionIndex = partitionTuple.Item2;
//Read Binary Data
byte[] byteArray = binaryDataReaders[symbol].ReadBytes(partitionIndex);
//Deserialize and return quote list
List<Quote> quoteList = dataInterfaces[symbol].Deserialize(symbol, byteArray);
return quoteList;
});
这就是我发布到转换块的方式:
quoteBuffer.SendAsync(new Tuple<Symbol, int>(symbol, counter));
原始问题:
有人帮助我使用以下自定义转换块。这个想法是发布/发送 TInput 并以异步方式对 TInput 进行操作,而自定义转换块在返回转换后的项目时保留发布项目的顺序。
例如,如果以相应的顺序发布 1、2、3,并且变换函数对每个输入进行平方并返回项目,则正确的输出值和顺序应该是 1、4、9,而不管 3 个变换操作中的哪一个在何时完成.
但是,我怀疑代码有错误,因为输出顺序不正确。更糟糕的是,混乱的订单位置是随机的,这使得调试变得更加困难,但这反映了一个事实,即显然将输入元素转换为输出元素的任务总是以不同的方式完成。
有人可以看看并可能给出一些我在这里缺少的提示吗?谢谢
public static IPropagatorBlock<TInput, TOutput> CreateConcurrentOrderedTransformBlock<TInput, TOutput>(Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}