我正在将现有的 Java/SpringBoot/ActiveMQ 控制台应用程序(它使用来自命名 ActiveMQ 消息队列的消息并处理消息)转换为类似的 C# .NET 控制台应用程序。对于成功案例,我已经让事情正常运行,但是对于消息处理程序无法成功处理消息的情况,我无法复制 Java 应用程序的行为。
当消息处理失败时,我试图复制的失败案例行为是消息的回滚和重新排队(回到接收消息的命名队列中)。
在 Java/SpringBoot 中,我通过将事务配置添加到发生消息处理的适当类/方法来做到这一点。对于失败情况,我抛出一个 RuntimeException(未选中)并允许在 Spring 事务框架中处理此类抛出的异常(使用所有 SpringBoot 默认值进行此类异常处理),并且基于框架的事务回滚处理,消息重新排队。我没有在我的 Java 应用程序代码逻辑中明确地进行任何回滚处理,而是允许框架默认流程来处理所有这些。
我不太熟悉用于在我的 C# 应用程序中实现这种自动回滚/重新排队行为的 Spring .NET 模拟。而且,到目前为止,我还无法复制这一点。根据 Spring .NET 文档的第 31.5.5 节:
在事务中调用消息侦听器只需要重新配置侦听器容器。本地消息事务可以通过设置属性 SessionAcknowledgeMode 来激活,该属性对于 NMS 是枚举类型 AcknowledgementMode,设置为 AcknowledgementMode.Transactional。然后,每个消息侦听器调用将在一个活动的消息传递事务中运行,在侦听器执行失败的情况下回滚消息接收。
我在我的 C# 控制台应用程序的 Main 方法中遵循了上述规定:
static void Main(string[] args)
{
var ctx = ContextRegistry.GetContext();
var msgListenerContainer = ctx.GetObject<SimpleMessageListenerContainer>("MessageListenerContainer");
msgListenerContainer.SessionAcknowledgeMode = AcknowledgementMode.Transactional;
Console.ReadLine();
}
但是,我不清楚如何在我的 C# 控制台应用程序中触发侦听器执行失败。我尝试抛出各种类型的异常(ApplicationException、Exception、NMSException),但我没有看到消息重新排队。
任何启示将不胜感激。
仅供参考,这是额外的相关代码逻辑和配置:
消息处理方式:
public void HandleMessage(Hashtable message)
{
Logger.Debug("Entered HandleMessage");
var ticketUuid = message["ti"] as string;
if (ticketUuid == null) throw new ArgumentNullException("ticketUuid");
var gameTitle = message["gt"] as string;
if (gameTitle == null) throw new ArgumentNullException("gameTitle");
var recallData = message["grd"] as string;
if (recallData == null) throw new ArgumentNullException("recallData");
Logger.Debug(string.Format("HandleMessage - ticketUuid={0} gameTitle={1} recallData={2}", ticketUuid,
gameTitle, recallData));
VideoRecordingService.RecordVideo(ticketUuid, gameTitle, recallData);
}
这是 RecordVideo 方法的一个版本,为了清楚起见,省略了很多不相关的代码逻辑:
[Transaction]
public async Task RecordVideo(string ticketId, string gameTitle, string gameRecallData)
{
<elided code>
// start up the video recording app
Recorder.PowerOn();
// start up the Air Game Exe as a separate process
RunAirGameProc(gameTitle);
// for testing failed message processing
throw new ApplicationException("forced exception");
var videoBytes = File.ReadAllBytes(gameShareVideoFilename);
MsgProducer.UpdateJobStatus(ticketId, TicketStatusEnum.Recorded, videoBytes);
MsgProducer.UploadVideo(ticketId, videoBytes);
}
catch (Exception ex)
{
Logger.WarnFormat("Exception caught: {0}" + Environment.NewLine + "{1}", ex.Message, ex.StackTrace);
throw new NMSException(ex.Message);
}
finally
{
// clean up files
<elided>
}
}
而且,这里是控制台应用程序的相关 Spring .NET 配置:
<spring>
<context>
<resource uri="config://spring/objects" />
</context>
<objects xmlns="http://www.springframework.net">
<description>Game Share Video Recording Service Spring IoC Configuration</description>
<object name="MsgProducer"
type="GameShare.VideoRecorder.MessageProducer, GameShare.VideoRecorder">
<property name="NmsTemplate" ref="NmsTemplate" />
<property name="JobInfoDestination">
<object type="Apache.NMS.ActiveMQ.Commands.ActiveMQQueue, Apache.NMS.ActiveMQ">
<constructor-arg value="gameShareJobInfo" />
</object>
</property>
<property name="VideoUploadDestination">
<object type="Apache.NMS.ActiveMQ.Commands.ActiveMQQueue, Apache.NMS.ActiveMQ">
<constructor-arg value="gameShareVideoUpload" />
</object>
</property>
</object>
<object name="ConnectionFactory"
type="Spring.Messaging.Nms.Connections.CachingConnectionFactory, Spring.Messaging.Nms">
<property name="SessionCacheSize" value="10" />
<property name="TargetConnectionFactory">
<object type="Apache.NMS.ActiveMQ.ConnectionFactory, Apache.NMS.ActiveMQ">
<constructor-arg index="0" value="tcp://localhost:61616" />
</object>
</property>
</object>
<object name="MessageHandler"
type="GameShare.VideoRecorder.MessageHandler, GameShare.VideoRecorder"
autowire="autodetect" />
<object name="MessageListenerAdapter"
type="Spring.Messaging.Nms.Listener.Adapter.MessageListenerAdapter, Spring.Messaging.Nms">
<property name="HandlerObject" ref="MessageHandler" />
</object>
<object name="MessageListenerContainer"
type="Spring.Messaging.Nms.Listener.SimpleMessageListenerContainer, Spring.Messaging.Nms">
<property name="ConnectionFactory" ref="ConnectionFactory" />
<property name="DestinationName" value="gameShareVideoRecording" />
<property name="MessageListener" ref="MessageListenerAdapter" />
</object>
<object name="NmsTemplate" type="Spring.Messaging.Nms.Core.NmsTemplate, Spring.Messaging.Nms">
<property name="ConnectionFactory" ref="ConnectionFactory" />
</object>
</objects>
</spring>
更新:在重新阅读 Spring .Net 文档的第 31 章(并对我的控制台应用程序进行了更多修改)之后,我找到了解决方案/答案。默认侦听器容器 (Spring.Messaging.Nms.Listener.SimpleMessageListenerContainer) 不提供我试图从我的 Java/Spring/ActiveMQ 模型中模拟的行为(顺便说一句,这是在上下文中消息处理的默认行为使用 Java/SpringBoot 框架的事务)。具体来说,如第 31.5.2 节异步接收中所述:
在消息处理期间抛出的异常可以传递给 IExceptionHandler 的实现,并通过属性 ExceptionListener 向容器注册。如果异常是 NMSException 类型(或其他提供程序的等效根异常类型),则将调用已注册的 IExceptionHandler。SimpleMessageListenerContainer 将在错误级别记录异常,并且不会将异常传播给提供者。确认和/或事务的所有处理都由侦听器容器完成。您可以重写方法 HandleListenerException 以更改此行为。
因此,为了获得所需的行为,我必须提供自己的 IExceptionHandler 和 IErrorHandler 实现,它们将异常传播给提供者。提供者(在我的例子中是 ActiveMQ)将看到异常并将消息重新排队。
这些接口的简单实现如下:
public class VideoRecorderApp
{
static void Main(string[] args)
{
var ctx = ContextRegistry.GetContext();
var msgListenerContainer = ctx.GetObject<SimpleMessageListenerContainer>("MessageListenerContainer");
msgListenerContainer.SessionAcknowledgeMode = AcknowledgementMode.Transactional;
msgListenerContainer.ErrorHandler = new MyErrorHandler();
msgListenerContainer.ExceptionListener = new MyExceptionListener();
Console.ReadLine();
}
}
internal class MyErrorHandler : IErrorHandler
{
/// <summary>
/// The logger
/// </summary>
private static readonly ILog Logger = LogManager.GetLogger<MyErrorHandler>();
/// <summary>
/// Handles the error.
/// </summary>
/// <param name="exception">The exception.</param>
public void HandleError(Exception exception)
{
Logger.WarnFormat("HandleError: {0}", exception.Message);
throw exception;
}
}
internal class MyExceptionListener : IExceptionListener
{
/// <summary>
/// The logger
/// </summary>
private static readonly ILog Logger = LogManager.GetLogger<MyExceptionListener>();
/// <summary>
/// Called when there is an exception in message processing.
/// </summary>
/// <param name="exception">The exception.</param>
public void OnException(Exception exception)
{
Logger.WarnFormat("OnException: {0}", exception.Message);
throw exception;
}
}
而且,如果想要/需要区分特定的异常类型(例如,其中一些应该触发重新排队,而另一些我们只想记录/吞下),当然可以在处理程序中添加代码逻辑来做到这一点.