1

使用 TPL,我如何从多个 IO 源(“无线程”任务)收集结果并将它们合并到一个序列中,因为它们从各自的源进来,而不为每个源生成一个基于线程的任务来监视它们?从一个线程轮询源是否安全?

while (true)
{
    try
    {
        IEnumerable<UdpClient> readyChannels = 
            from channel in channels
            where channel.Available > 0
            select channel;

        foreach( UdpClient channel in readyChannels)
        {
           var result = await channel.ReceiveAsync();
           //do something with result like post to dataflow block.
        }
    }
    catch (Exception e)
    {
        throw (e);
    }
    ...

那样的事情怎么样?

4

1 回答 1

1

我在这里看到几个选项:

如果您想启动对 的调用ReceiveAsync(),请将它们设置为对结果执行某些操作(例如,如您所说,发送到数据流块)然后忘记它们,您可以使用ContinueWith()

foreach (var channel in readyChannels)
{
   channel.ReceiveAsync().ContinueWith(task => 
   {
       var result = task.Result;
       //do something with result like post to dataflow block.
   }
}

这样做的一个缺点是您需要在每个延续中处理异常。

可能更好的方法是使用OrderByCompletion()Stephen Cleary 的 AsyncEx。这样,您可以一次开始所有读取并在它们完成时对其进行处理:

var tasks = readyChannels.Select(c => c.ReceiveAsync()).OrderByCompletion();

foreach (var task in tasks)
{
   var result = await task;
   //do something with result like post to dataflow block.
}

如果您想限制并行性,另一个有用的选项是使用TransformBlock

var receiveBlock = new TransformBlock<UdpClient, UdpReceiveResult>(
    c => c.ReceiveAsync(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = degreeOfParallelism });
foreach (var channel in readyChannels)
    receiveBlock.Post(channel);
receiveBlock.Complete();

// set up processing here

await receiveBlock.Completion;

如果您想将结果发送到另一个块,那么上面评论中提到的处理包括简单地将它们链接在一起:

receiveBlock.LinkTo(anotherBlock);

在上述所有情况下,从来没有线程阻塞来监视任何东西。但是要调用ReceiveAsync()然后处理结果的代码必须在某处执行。

于 2013-01-22T14:50:09.170 回答