1

我正在将现有的 Java/SpringBoot/ActiveMQ 控制台应用程序(它使用来自命名 ActiveMQ 消息队列的消息并处理消息)转换为类似的 C# .NET 控制台应用程序。对于成功案例,我已经让事情正常运行,但是对于消息处理程序无法成功处理消息的情况,我无法复制 Java 应用程序的行为。

当消息处理失败时,我试图复制的失败案例行为是消息的回滚和重新排队(回到接收消息的命名队列中)。

在 Java/SpringBoot 中,我通过将事务配置添加到发生消息处理的适当类/方法来做到这一点。对于失败情况,我抛出一个 RuntimeException(未选中)并允许在 Spring 事务框架中处理此类抛出的异常(使用所有 SpringBoot 默认值进行此类异常处理),并且基于框架的事务回滚处理,消息重新排队。我没有在我的 Java 应用程序代码逻辑中明确地进行任何回滚处理,而是允许框架默认流程来处理所有这些。

我不太熟悉用于在我的 C# 应用程序中实现这种自动回滚/重新排队行为的 Spring .NET 模拟。而且,到目前为止,我还无法复制这一点。根据 Spring .NET 文档的第 31.5.5 节:

在事务中调用消息侦听器只需要重新配置侦听器容器。本地消息事务可以通过设置属性 SessionAcknowledgeMode 来激活,该属性对于 NMS 是枚举类型 AcknowledgementMode,设置为 AcknowledgementMode.Transactional。然后,每个消息侦听器调用将在一个活动的消息传递事务中运行,在侦听器执行失败的情况下回滚消息接收。

我在我的 C# 控制台应用程序的 Main 方法中遵循了上述规定:

static void Main(string[] args)
{
    var ctx = ContextRegistry.GetContext();
    var msgListenerContainer = ctx.GetObject<SimpleMessageListenerContainer>("MessageListenerContainer");
    msgListenerContainer.SessionAcknowledgeMode = AcknowledgementMode.Transactional;

    Console.ReadLine();
}

但是,我不清楚如何在我的 C# 控制台应用程序中触发侦听器执行失败。我尝试抛出各种类型的异常(ApplicationException、Exception、NMSException),但我没有看到消息重新排队。

任何启示将不胜感激。

仅供参考,这是额外的相关代码逻辑和配置:

消息处理方式:

public void HandleMessage(Hashtable message)
{
    Logger.Debug("Entered HandleMessage");

    var ticketUuid = message["ti"] as string;
    if (ticketUuid == null) throw new ArgumentNullException("ticketUuid");
    var gameTitle = message["gt"] as string;
    if (gameTitle == null) throw new ArgumentNullException("gameTitle");
    var recallData = message["grd"] as string;
    if (recallData == null) throw new ArgumentNullException("recallData");
    Logger.Debug(string.Format("HandleMessage - ticketUuid={0} gameTitle={1} recallData={2}", ticketUuid,
        gameTitle, recallData));
    VideoRecordingService.RecordVideo(ticketUuid, gameTitle, recallData);
}

这是 RecordVideo 方法的一个版本,为了清楚起见,省略了很多不相关的代码逻辑:

    [Transaction]
    public async Task RecordVideo(string ticketId, string gameTitle, string gameRecallData)
    {
            <elided code>

            // start up the video recording app
            Recorder.PowerOn();

            // start up the Air Game Exe as a separate process
            RunAirGameProc(gameTitle);

            // for testing failed message processing
            throw new ApplicationException("forced exception");

            var videoBytes = File.ReadAllBytes(gameShareVideoFilename);
            MsgProducer.UpdateJobStatus(ticketId, TicketStatusEnum.Recorded, videoBytes);
            MsgProducer.UploadVideo(ticketId, videoBytes);
        }
        catch (Exception ex)
        {
            Logger.WarnFormat("Exception caught: {0}" + Environment.NewLine + "{1}", ex.Message, ex.StackTrace);
            throw new NMSException(ex.Message);
        }
        finally
        {
            // clean up files
            <elided>
        }
    }

而且,这里是控制台应用程序的相关 Spring .NET 配置:

<spring>
    <context>
      <resource uri="config://spring/objects" />
    </context>
    <objects xmlns="http://www.springframework.net">
      <description>Game Share Video Recording Service Spring IoC Configuration</description>
      <object name="MsgProducer"
              type="GameShare.VideoRecorder.MessageProducer, GameShare.VideoRecorder">
        <property name="NmsTemplate" ref="NmsTemplate" />
        <property name="JobInfoDestination">
          <object type="Apache.NMS.ActiveMQ.Commands.ActiveMQQueue, Apache.NMS.ActiveMQ">
            <constructor-arg value="gameShareJobInfo" />
          </object>
        </property>
        <property name="VideoUploadDestination">
          <object type="Apache.NMS.ActiveMQ.Commands.ActiveMQQueue, Apache.NMS.ActiveMQ">
            <constructor-arg value="gameShareVideoUpload" />
          </object>
        </property>
      </object>
      <object name="ConnectionFactory"
              type="Spring.Messaging.Nms.Connections.CachingConnectionFactory, Spring.Messaging.Nms">
        <property name="SessionCacheSize" value="10" />
        <property name="TargetConnectionFactory">
          <object type="Apache.NMS.ActiveMQ.ConnectionFactory, Apache.NMS.ActiveMQ">
            <constructor-arg index="0" value="tcp://localhost:61616" />
          </object>
        </property>
      </object>
      <object name="MessageHandler"
              type="GameShare.VideoRecorder.MessageHandler, GameShare.VideoRecorder"
              autowire="autodetect" />
      <object name="MessageListenerAdapter"
              type="Spring.Messaging.Nms.Listener.Adapter.MessageListenerAdapter, Spring.Messaging.Nms">
        <property name="HandlerObject" ref="MessageHandler" />
      </object>
      <object name="MessageListenerContainer"
              type="Spring.Messaging.Nms.Listener.SimpleMessageListenerContainer, Spring.Messaging.Nms">
        <property name="ConnectionFactory" ref="ConnectionFactory" />
        <property name="DestinationName" value="gameShareVideoRecording" />
        <property name="MessageListener" ref="MessageListenerAdapter" />
      </object>
      <object name="NmsTemplate" type="Spring.Messaging.Nms.Core.NmsTemplate, Spring.Messaging.Nms">
        <property name="ConnectionFactory" ref="ConnectionFactory" />
      </object>
    </objects>
  </spring>

更新:在重新阅读 Spring .Net 文档的第 31 章(并对我的控制台应用程序进行了更多修改)之后,我找到了解决方案/答案。默认侦听器容器 (Spring.Messaging.Nms.Listener.SimpleMessageListenerContainer) 不提供我试图从我的 Java/Spring/ActiveMQ 模型中模拟的行为(顺便说一句,这在上下文中消息处理的默认行为使用 Java/SpringBoot 框架的事务)。具体来说,如第 31.5.2 节异步接收中所述:

在消息处理期间抛出的异常可以传递给 IExceptionHandler 的实现,并通过属性 ExceptionListener 向容器注册。如果异常是 NMSException 类型(或其他提供程序的等效根异常类型),则将调用已注册的 IExceptionHandler。SimpleMessageListenerContainer 将在错误级别记录异常,并且不会将异常传播给提供者。确认和/或事务的所有处理都由侦听器容器完成。您可以重写方法 HandleListenerException 以更改此行为。

因此,为了获得所需的行为,我必须提供自己的 IExceptionHandler 和 IErrorHandler 实现,它们将异常传播给提供者。提供者(在我的例子中是 ActiveMQ)将看到异常并将消息重新排队。

这些接口的简单实现如下:

public class VideoRecorderApp
{
    static void Main(string[] args)
    {
        var ctx = ContextRegistry.GetContext();
        var msgListenerContainer = ctx.GetObject<SimpleMessageListenerContainer>("MessageListenerContainer");
        msgListenerContainer.SessionAcknowledgeMode = AcknowledgementMode.Transactional;
        msgListenerContainer.ErrorHandler = new MyErrorHandler();
        msgListenerContainer.ExceptionListener = new MyExceptionListener();

        Console.ReadLine();
    }
}

internal class MyErrorHandler : IErrorHandler
{
    /// <summary>
    ///     The logger
    /// </summary>
    private static readonly ILog Logger = LogManager.GetLogger<MyErrorHandler>();

    /// <summary>
    /// Handles the error.
    /// </summary>
    /// <param name="exception">The exception.</param>
    public void HandleError(Exception exception)
    {
        Logger.WarnFormat("HandleError: {0}", exception.Message);
        throw exception;
    }
}

internal class MyExceptionListener : IExceptionListener
{
    /// <summary>
    ///     The logger
    /// </summary>
    private static readonly ILog Logger = LogManager.GetLogger<MyExceptionListener>();

    /// <summary>
    /// Called when there is an exception in message processing.
    /// </summary>
    /// <param name="exception">The exception.</param>
    public void OnException(Exception exception)
    {
        Logger.WarnFormat("OnException: {0}", exception.Message);
        throw exception;
    }
}

而且,如果想要/需要区分特定的异常类型(例如,其中一些应该触发重新排队,而另一些我们只想记录/吞下),当然可以在处理程序中添加代码逻辑来做到这一点.

4

0 回答 0