1

我对传递到广播块的列表集合有疑问。这是我到目前为止所拥有的(伪代码,因为完整的代码库太长了):

private BroadcastBlock<List<Quote>> tempBCB;
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb1;
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb2;
private BatchBlock<Dictionary<int, IParentOrder>> batchBlock;
private JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]> joinBlock;
private TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>,List<MySignal>> transformBlock;

tempBCB = new BroadcastBlock<List<Quote>>(quoteList => {
    return quoteList;
    //return Cloning.CloneListCloneValues<Quote>(quoteList);
});

tfb1 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>});

tfb2 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>});

batchBlock = new BatchBlock<Dictionary<int, IParentOrder>>(2);

joinBlock = new JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]>(
    new GroupingDataflowBlockOptions { Greedy = false });

transformBlock = new TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>, List<MySignal>>(
    tuple => { //do something and return List<MySignal>;});

//Linking
tempBCB.LinkTo(tfb1);
tempBCB.LinkTo(tfb2);
tfb1.LinkTo(batchBlock);
tfb2.LinkTo(batchBlock);
tempBCB.LinkTo(joinBlock.Target1);
batchBlock.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(transformBlock);

我的问题是,在当前的实现中,tempBCB我在 final 中得到了奇怪的结果TransformBlock<TInput, TOutput>

例如,Dictionary<int, IParentrOrder>作为元组一部分的集合大小不相等,即使实现tfb1tfb2100% 相同。

实现中被注释掉的行tempBCB对广播列表做了一个深拷贝,这似乎解决了问题,但问题是这个深拷贝使我的代码慢了大约 10 倍,这是我需要找到的数量级一个不同的解决方案。

首先,我不确定这是问题所在,还是只是速度变慢导致并发操作按预期执行,即使错误仍然隐藏在那里。

其次,如果广播块中缺少深拷贝会导致这些问题,我怎样才能让它更快?

这是我的深拷贝代码:

public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) 
    where TValue : ICloneable
{
    List<TValue> ret = new List<TValue>(original.Count);

    foreach (TValue entry in original)
    {
        ret.Add((TValue)entry.Clone());
    }

    return ret;
}

我可能会将 aQuote[]而不是馈List<Quote>入广播块,但我看不出它如何有助于加快深度复制的性能。

我的问题是:

  • 深拷贝问题是这里的真正问题吗(我怀疑,因为List<Quote>任何转换块都不会改变流入广播块的 , )?
  • 如果是,为什么以及如何使深层副本更有效?
4

1 回答 1

1

我回答我自己的问题,因为我最终解决了这个问题。svick 警告的问题与广播块中是否List<Quote>需要深拷贝无关(实际上它不需要深拷贝)。该问题与广播块有关,该广播块被请求完成(对链接的数据流块的完整传播设置为真),在批处理块之前,它也链接到 joinBlock,可能将所有项目流式传输到 joinBlock。我只是取出了 joinBlock,因为我重写了转换块(它们现在返回自己的转换项以及原始项以及使 joinBlock 过时。

注意主 transformBlock 中的并发性:将 MaxDegreeOfParallelism 设置为 > 1 即使在这种轻量工作负载下也已经提供了性能优势,但是,当向其投入更重的工作负载时,它确实会发挥作用。

这里是编译和工作的完整代码(我重命名了一些类,但结构保持如前所述):

public class Test
{
    private Stopwatch watch;

    private BroadcastBlock<List<InputObject>> tempBCB;
    private BatchBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> batchBlock;
    private TransformBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>[], List<FinalObject>> transformBlock;
    private ActionBlock<List<FinalObject>> justToFlushTransformBlock;

    private CoreLogic core1;
    private CoreLogic core2;

    public Test()
    {
        tempBCB = new BroadcastBlock<List<InputObject>>(input => input);

        //here batch size = 2
        batchBlock = new BatchBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>>(2, new GroupingDataflowBlockOptions { Greedy = false });

        transformBlock = new TransformBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>[],List<FinalObject>>(array =>
        {
            List<InputObject> inputObjects = array[0].Item1;
            List<FinalObject> ret = inputObjects.ConvertAll(x => new FinalObject(x));

            foreach (var tuple in array)
            {
                //iterate over each individual object
                foreach (var dictionary in tuple.Item2)
                {
                    ret[dictionary.Key].outputList.Add(dictionary.Value);
                }
            }

            return ret;
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        justToFlushTransformBlock = new ActionBlock<List<FinalObject>>(list =>
            {
                //just in order to accept items from the transformBlock output queue
            });

        //Generate 2 CoreLogic objects
        core1 = new CoreLogic();
        core2 = new CoreLogic();

        //linking
        tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
        tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

        core1.transformBlock.LinkTo(batchBlock);
        core2.transformBlock.LinkTo(batchBlock);

        batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

        transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numberChunks = 30;

        watch = new Stopwatch();
        watch.Start();

        for (int j = 1; j <= numberChunks; j++)
        {
            int collectionSize = 10000 * j;

            List<InputObject> collection = new List<InputObject>(collectionSize);
            for (int i = 0; i < collectionSize; i++)
            {
                collection.Add(new InputObject(i));
            }

            tempBCB.Post(collection);
        }

        tempBCB.Complete();

        Task.WhenAll(core1.transformBlock.Completion, core2.transformBlock.Completion).ContinueWith(_ =>
            {
                batchBlock.Complete();
            });

        transformBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
        Console.ReadLine();
    }
}

public class CoreLogic
{
    private Random rand;
    public TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> transformBlock;

    public CoreLogic()
    {
        const int numberIntermediateObjects = 10000;

        transformBlock = new TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>>(input =>
        {
            //please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return

            Dictionary<int, IntermediateObject> ret = new Dictionary<int, IntermediateObject>();
            for (int i = 0; i < numberIntermediateObjects; i++)
            {
                IntermediateObject value = new IntermediateObject(i);

                ret.Add(i, value);
            }

            var tuple = new Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>(input, ret);

            return tuple;
        });
    }
}

public class InputObject : ICloneable
{
    public int value1 { get; private set; }

    public InputObject(int value)
    {
        this.value1 = value;
    }

    object ICloneable.Clone()
    {
        return Clone();
    }

    public InputObject Clone()
    {
        return (InputObject)this.MemberwiseClone();
    }
}

public class IntermediateObject
{
    public int value1 { get; private set; }

    public IntermediateObject(int value)
    {
        this.value1 = value;
    }
}

public class FinalObject
{
    public InputObject input { get; private set; }
    public List<IntermediateObject> outputList;

    public FinalObject(InputObject input)
    {
        this.input = input;

        this.outputList = new List<IntermediateObject>();
    }
}

public static class Cloning
{
    public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) where TValue : ICloneable
    {
        List<TValue> ret = new List<TValue>(original.Count);

        foreach (TValue entry in original)
        {
            ret.Add((TValue)entry.Clone());
        }

        return ret;
    }
}

我希望这可以帮助其他可能遇到类似问题的人。我喜欢 TPL Dataflow,尤其是 svick 确实帮助并激励了我更深入地挖掘。谢谢你!

于 2012-12-06T08:03:14.710 回答