我有一个 TPL 数据流操作块,用于接收相机的触发消息,然后进行一些处理。如果处理任务抛出异常,则 ActionBlock 进入故障状态。我想向我的 UI 发送一条错误消息并向 ActionBlock 发送一条重置消息,以便它可以继续处理传入的触发消息。有没有办法将 ActionBlock 返回到就绪状态(清除故障)?
好奇的代码:
using System.Threading.Tasks.Dataflow;
namespace Anonymous
{
/// <summary>
/// Provides a messaging system between objects that inherit from Actor
/// </summary>
public abstract class Actor
{
//The Actor uses an ActionBlock from the DataFlow library. An ActionBlock has an input queue you can
// post messages to and an action that will be invoked for each received message.
//The ActionBlock handles all of the threading issues internally so that we don't need to deal with
// threads or tasks. Thread-safety comes from the fact that ActionBlocks are serialized by default.
// If you send two messages to it at the same time it will buffer the second message until the first
// has been processed.
private ActionBlock<Message> _action;
...Properties omitted for brevity...
public Actor(string name, int id)
{
_name = name;
_id = id;
CreateActionBlock();
}
private void CreateActionBlock()
{
// We create an action that will convert the actor and the message to dynamic objects
// and then call the HandleMessage method. This means that the runtime will look up
// a method called ‘HandleMessage’ with a parameter of the message type and call it.
// in TPL Dataflow if an exception goes unhandled during the processing of a message,
// (HandleMessage) the exception will fault the block’s Completion task.
//Dynamic objects expose members such as properties and methods at run time, instead
// of at compile time. This enables you to create objects to work with structures that
// do not match a static type or format.
_action = new ActionBlock<Message>(message =>
{
dynamic self = this;
dynamic msg = message;
self.HandleMessage(msg); //implement HandleMessage in the derived class
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1 // This specifies a maximum degree of parallelism of 1.
// This causes the dataflow block to process messages serially.
});
}
/// <summary>
/// Send a message to an internal ActionBlock for processing
/// </summary>
/// <param name="message"></param>
public async void SendMessage(Message message)
{
if (message.Source == null)
throw new Exception("Message source cannot be null.");
try
{
_action.Post(message);
await _action.Completion;
message = null;
//in TPL Dataflow if an exception goes unhandled during the processing of a message,
// the exception will fault the block’s Completion task.
}
catch(Exception ex)
{
_action.Completion.Dispose();
//throw new Exception("ActionBlock for " + _name + " failed.", ex);
Trace.WriteLine("ActionBlock for " + _name + " failed." + ExceptionExtensions.GetFullMessage(ex));
if (_action.Completion.IsFaulted)
{
_isFaulted = true;
_faultReason = _name + " ActionBlock encountered an exception while processing task: " + ex.ToString();
FaultMessage msg = new FaultMessage { Source = _name, FaultReason = _faultReason, IsFaulted = _isFaulted };
OnFaulted(msg);
CreateActionBlock();
}
}
}
public event EventHandler<FaultMessageEventArgs> Faulted;
public void OnFaulted(FaultMessage message)
{
Faulted?.Invoke(this, new FaultMessageEventArgs { Message = message.Copy() });
message = null;
}
/// <summary>
/// Use to await the message processing result
/// </summary>
public Task Completion
{
get
{
_action.Complete();
return _action.Completion;
}
}
}
}