发布到 ActionBlock 时,我们看到了意外行为,即使 MaxDegreeOfParallelism 为 1,并行处理似乎也在发生。这是场景。
发布到 ActionBlock 的类如下所示:
public class ByteHandler {
...
public ByteHandler(ByteHandlingCallback delegateCallback){
_byteHandlingActionBlock = new ActionBlock<byte[]>(bytes => delegateCallback.HandleBytes(bytes));
}
public void HandleBytes(byte[] bytes){
_byteHandlingActionBlock.Post(bytes);
}
在下游,我们将字节反序列化为对象,并根据它们的类型将这些对象(我们称之为通知)传递给处理程序:
public class NotificationHandler{
private readonly Dictionary<string, AutoResetEvent> _secondNoticeReceivedEvents;
public void HandleInitialNotification(InitialNotification notification){
var secondNoticeReceivedEvent = new AutoResetEvent(false);
if (!_secondNoticeReceivedEvents.TryAdd(notification.ID, secondNoticeReceivedEvent)) return;
DoSomethingDownstream(notification);
if (secondNoticeReceivedEvent.WaitOne(_timeout))
DoSomethingElseDownstream();
else
throw new Exception("Second notification not received within timeout!");
_secondNoticeReceivedEvents.TryRemove(notification.ID, out secondNoticeReceivedEvent);
}
public void HandleSecondNotification(SecondNotification notification){
AutoResetEvent secondNoticeReceivedEvent;
if (_secondNoticeReceivedEvents.TryRemove(notification.ID, out secondNoticeReceivedEvent))
secondNoticeReceivedEvent.Set();
}
这个处理程序有一个致命的错误:InitialNotifications 在其对应的 SecondNotifications 之前进入,但 HandleInitialNotification 在退出之前等待 HandleSecondNotification,因此线程永远不会到达 HandleSecondNotification。
通常,我们看到 HandleInitialNotification 一直阻塞,直到它等待 HandleSecondNotification 超时,然后在同一线程上处理挂起的 SecondNotification 继续执行。这是我们通常在日志中看到的内容:
2013-07-05 13:27:25,755 [13] INFO Received InitialNotification for: XX
2013-07-05 13:27:35,758 [13] WARN Second notification not not received for XX within timeout!
2013-07-05 13:27:35,761 [13] INFO Received SecondNotification for: XX
这不是代码的预期工作方式,但考虑到它的编写方式,它应该总是超时等待 SecondNotification。但是,我们偶尔也会看到 HandleInitialNotification 在超时之前完成,HandleSecondNotification 在不同的线程上被及时处理:
2013-07-05 13:38:13,232 [56] INFO Received InitialNotification for: YY
2013-07-05 13:38:13,258 [11] INFO Received SecondNotification for: YY
由于我们使用默认的 ActionBlock,MaxDegreeOfParallelism 应该为 1。那么,第二个线程(源自 ActionBlock)如何在发布到 ActionBlock 的原始线程阻塞时接收 SecondNotification?