2

发布到 ActionBlock 时,我们看到了意外行为,即使 MaxDegreeOfParallelism 为 1,并行处理似乎也在发生。这是场景。

发布到 ActionBlock 的类如下所示:

public class ByteHandler {
...
   public ByteHandler(ByteHandlingCallback delegateCallback){
       _byteHandlingActionBlock = new ActionBlock<byte[]>(bytes => delegateCallback.HandleBytes(bytes));
   }


   public void HandleBytes(byte[] bytes){
       _byteHandlingActionBlock.Post(bytes);
   }

在下游,我们将字节反序列化为对象,并根据它们的类型将这些对象(我们称之为通知)传递给处理程序:

public class NotificationHandler{
   private readonly Dictionary<string, AutoResetEvent> _secondNoticeReceivedEvents;

   public void HandleInitialNotification(InitialNotification notification){
       var secondNoticeReceivedEvent = new AutoResetEvent(false);
       if (!_secondNoticeReceivedEvents.TryAdd(notification.ID, secondNoticeReceivedEvent)) return;

       DoSomethingDownstream(notification);

       if (secondNoticeReceivedEvent.WaitOne(_timeout))
            DoSomethingElseDownstream();
       else
            throw new Exception("Second notification not received within timeout!");

       _secondNoticeReceivedEvents.TryRemove(notification.ID, out secondNoticeReceivedEvent);
   }

   public void HandleSecondNotification(SecondNotification notification){
       AutoResetEvent secondNoticeReceivedEvent;
       if (_secondNoticeReceivedEvents.TryRemove(notification.ID, out secondNoticeReceivedEvent))
            secondNoticeReceivedEvent.Set();
   }

这个处理程序有一个致命的错误:InitialNotifications 在其对应的 SecondNotifications 之前进入,但 HandleInitialNotification 在退出之前等待 HandleSecondNotification,因此线程永远不会到达 HandleSecondNotification。

通常,我们看到 HandleInitialNotification 一直阻塞,直到它等待 HandleSecondNotification 超时,然后在同一线程上处理挂起的 SecondNotification 继续执行。这是我们通常在日志中看到的内容:

2013-07-05 13:27:25,755 [13] INFO  Received InitialNotification for: XX
2013-07-05 13:27:35,758 [13] WARN  Second notification not not received for XX within timeout!
2013-07-05 13:27:35,761 [13] INFO  Received SecondNotification for: XX

这不是代码的预期工作方式,但考虑到它的编写方式,它应该总是超时等待 SecondNotification。但是,我们偶尔也会看到 HandleInitialNotification 在超时之前完成,HandleSecondNotification 在不同的线程上被及时处理:

2013-07-05 13:38:13,232 [56] INFO  Received InitialNotification for: YY
2013-07-05 13:38:13,258 [11] INFO  Received SecondNotification for: YY

由于我们使用默认的 ActionBlock,MaxDegreeOfParallelism 应该为 1。那么,第二个线程(源自 ActionBlock)如何在发布到 ActionBlock 的原始线程阻塞时接收 SecondNotification?

4

1 回答 1

0

请注意,您的超时/警告几乎在 10 秒内发生,而所有其他事件以毫秒为单位发生。您很可能以某种方式同步调用通知处理程序方法(HandleInitialNotification 和 HandleSecondNotification),以便操作块在操作块抓取其缓冲区中的下一项之前等待 HandleInitialNotification 中的超时完成。您需要异步执行超时等待。

于 2013-12-03T21:48:23.593 回答