3

刚刚使用 TPL DataFlow 编写了一个示例生产者消费者模式。我在这里有一些基本问题。

  1. 只有在生产者发布了所有项目后,消费者才处于活动状态。异步是否意味着生产和消费任务都可以并行运行。

  2. 给定消费者的睡眠时间,以验证其是否阻塞了其他数据项。它似乎是按顺序执行的,没有得到任何并行性。

我在这里做错了吗?

class AscDataBlocks
{
    public Int64 start;
    public Int64 End;
    //public string ThreadName;
    public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform) 
    // This is using TPL DataBlock producer consumer pattern.
    {
        for (var i = start; i < End; i++)
        {
            Console.WriteLine("Postingasc : {0}", i);
            targetAscTransform.Post(i);
        }
    }

    public void ProcessDataBuffer(Int64 ascDataSet)
    {
        if (ascDataSet == 5) 
        // Testing if this will delay all the other data processing
            Thread.Sleep(5000);
        else
            Thread.Sleep(500);                
        Console.WriteLine(ascDataSet);
    }

    // Demonstrates the consumption end of the producer and consumer pattern. 
    public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
    {
        // Initialize a counter to track the number of bytes that are processed. 
        int status = 0;

        // Read from the source buffer until the source buffer has no  
        // available output data. 
        while (await source.OutputAvailableAsync())
        {
            Int64 data;
            source.TryReceive(out data);
            ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
            // Increment the count of bytes received.
            status = 1;
        }
        return status;
    }
}

static void Main(string[] args)
{
    AscDataBlocks ascb;
    BufferBlock<Int64> ascbuffer;
    System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
    CancellationToken ct = new CancellationToken();
    CancellationTokenSource cts = new CancellationTokenSource();
    ascb = new AscDataBlocks();
    ascb.start = 1;
    ascb.End = 100;           
    try
    {
        ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            CancellationToken = ct
        });// Initiallize the buffer bloack
        ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
        //Post source data to the dataflow block.
        ascb.AscBufferProducer(ascbuffer);
        ascProcessingconsumer.Wait();
    }
    catch (Exception ex)
    {
        //foreach (var v in ex.InnerExceptions)
        //    Console.WriteLine("msg: " + v.Message);                
    }            
}  
4

1 回答 1

4

只有在生产者发布了所有项目后,消费者才处于活动状态。异步是否意味着生产和消费任务都可以并行运行。

发生这种情况是因为您在消费者有机会开始之前非常快速地发布了所有商品。如果添加Thread.Sleep(100),您会看到它们实际上是并行工作的。

给定消费者的睡眠时间,以验证其是否阻塞了其他数据项。它似乎是按顺序执行的,没有得到任何并行性。

TPL 数据流并不神奇:它不会修改您的代码以并行执行。是你调用AscTransConsumerAsync()一次,所以不要惊讶它实际上只执行一次。

TDF 确实支持并行处理,但您实际上需要让它执行处理代码。为此,请使用其中一个执行块。在你的情况下ActionBlock似乎合适。

如果使用它,则可以通过设置将块配置为并行执行MaxDegreeOfParallelism。当然,这样做意味着您需要确保处理委托是线程安全的。

有了这个,AscTransConsumerAsync()现在可能看起来像:

public async Task<Int64> AscTransConsumerAsync(ISourceBlock<Int64> source)
{
    // counter to track the number of items that are processed
    Int64 count = 0;

    var actionBlock = new ActionBlock<Int64>(
        data =>
        {
            ProcessDataBuffer(data);
            // count has to be accessed in a thread-safe manner
            // be careful about using Interlocked,
            // for more complicated computations, locking might be more appropriate
            Interlocked.Increment(ref count);
        },
        // some small constant might be better than Unbounded, depedning on circumstances
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    source.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    // this assumes source will be completed when done,
    // you need to call ascbuffer.Complete() after AscBufferProducer() for this
    await actionBlock.Completion;

    return count;
}
于 2013-01-10T13:19:40.423 回答