1

我正在尝试使用 NMS 的新延迟交付功能。

schedulerSupport 属性已在配置文件中设置,我正在使用以下代码尝试延迟消息的传递,直到达到用户选择的日期/时间。

代码(目前似乎不起作用)如下:

var timeDelay = dateTimePicker.Value.Subtract(DateTime.Now).TotalMilliseconds;     
var message = topicPublisher.CreateTextMessage();
message.Properties["AMQ_SCHEDULED_DELAY"] = timeDelay;
message.Text = CM.ToXMLString();

topicPublisher.Send(message);

你能指出这个例子中可能不正确的地方吗?

非常感谢!

4

2 回答 2

3

从提供的代码中我看不到任何明显的东西。

您可以尝试在代理中打开日志以查看调度程序是否收到消息并且值是否正确,这也将确认您确实启用了调度程序支持。您还可以尝试创建一个小型 Java 程序,该程序执行类似的操作来确定 NMS 客户端的行为是否正确。

我假设您有一个消费者正在运行并且它的连接对象已经启动?

问候蒂姆。

www.fusesource.com

于 2011-02-23T12:10:12.020 回答
3

感谢您对蒂姆的帮助。我已经找到了问题的原因。我的反应稍有延迟——我不得不专注于其他工作领域,但我今天设法回到了这一点。

问题是 C# ".TotalMilliseconds" 函数——它返回一个分数的总.部分毫秒值,很明显 ActiveMQ 不喜欢这个值。

现在我将此毫秒值转换为整数,延迟消息按要求工作。

日志摘录如下:

2011-03-07 10:14:44,186 | DEBUG | quasar adding destination: topic://REDACTED | org.apache.activemq.broker.region.AbstractRegion | ActiveMQ Transport: tcp:///REDACTED:50161
2011-03-07 10:14:44,576 | DEBUG | Error occured while processing async command: ActiveMQTextMessage {commandId = 4, responseRequired = false, messageId = ID:HL003323-50159-634350896828327757-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:HL003323-50159-634350896828327757-0:0:1:1, destination = topic://REDACTED, transactionId = null, expiration = 0, timestamp = 1299492884437, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1ae0436, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {targetModule=HL.Services.Blackbird.OrderManager, AMQ_SCHEDULED_DELAY=53564.4233}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = <?xml version="1.0" encoding="utf-16"?>
<Com...mandMessage>}, exception: java.lang.NullPointerException | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///REDACTED:50161
java.lang.NullPointerException
        at org.apache.activemq.broker.scheduler.SchedulerBroker.send(SchedulerBroker.java:179)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:192)
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:462)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:677)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
        at java.lang.Thread.run(Thread.java:595)
2011-03-07 10:14:44,577 | WARN  | Async error occurred: java.lang.NullPointerException | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///REDACTED:50161
java.lang.NullPointerException
        at org.apache.activemq.broker.scheduler.SchedulerBroker.send(SchedulerBroker.java:179)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
        at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:192)
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:462)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:677)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
        at java.lang.Thread.run(Thread.java:595)
于 2011-03-07T10:28:56.557 回答