1

我正在尝试删除 ActiveMQ 中的计划作业,但到目前为止还没有运气。

使用NMS APIAmqpnetlite在消息中创建计划(openwire lib 除外,因为该库未更新且不能在 netstandard/netcore 上使用)

用于使用 NMS 创建计划的示例代码,与 AMQP 库相同:

var factory = new Apache.NMS.ActiveMQ.ConnectionFactory(brokerUri);
IConnection connection = factory.CreateConnection(user, password);
connection.Start();

ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IDestination dest = session.GetQueue(destination);
IMessageProducer producer = session.CreateProducer(dest);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

var msg = session.CreateTextMessage("Sample text message");
msg.Properties.SetString("AMQ_SCHEDULED_CRON", "* * * * *");
producer.Send(msg);
connection.Close();

这部分在浏览器控制台中产生以下结果,这就是我愿意删除的:

代码结果

我已经阅读了这个其他问题和答案,还有活动的 mq 系统常量,但没有办法删除计划。还尝试浏览文档,但到目前为止找不到任何有用的东西

ActiveMQ 是否甚至支持以编程方式管理计划?AMQP 解决方案会很棒,但 NMS 也值得赞赏。

4

2 回答 2

2

您可以通过 STOMP、AMQP 或仅从 JMS 客户端管理 ActiveMQ 中的计划作业。在展示如何使用 ActiveMQ Java 客户端执行此操作之前,我已经写过此内容,但原理是相同的。您可以发送带有特定标头集的消息,这些标头将对预定的消息进行操作。

要浏览预定消息的集合,您需要执行以下操作:

    Connection connection = createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // Create the Browse Destination and the Reply To location
    Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
    Destination browseDest = session.createTemporaryQueue();

    // Create the "Browser"
    MessageConsumer browser = session.createConsumer(browseDest);

    connection.start();

    // Send the browse request
    MessageProducer producer = session.createProducer(requestBrowse);
    Message request = session.createMessage();
    request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                              ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
    request.setJMSReplyTo(browseDest);
    producer.send(request);

    Message scheduled = browser.receive(5000);
    while (scheduled != null) {
        // Do something clever...
    }

返回的消息包含有关先前已添加的实际计划消息作业的信息。获取作业 ID 允许您完全取消所述消息的传递。

要删除使用 Java 客户端、AMQP 客户端或其他协议客户端安排的预定消息发送,您需要执行以下操作:

    Message remove = session.createMessage();
    remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
            ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
    remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
            scheduled.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
    producer.send(remove);

使用调度程序时可以使用的全套消息属性值在此处记录,在 AMQP 中只需使用每个字符串文字作为您使用作业 ID 设置为远程的应用程序属性值,或者在 NMS 客户端中使用它' d 只是带有要删除的作业 ID 的字符串键消息属性。

但是在通过 AMQP 执行此操作时有一个警告,那就是您需要确保代理正在使用 JMS 转换器,?transport.transformer=jms"请参阅 ActiveMQ 5 的AMQP 文档

于 2020-09-10T14:22:46.270 回答
2

接受的答案有效且正确。原来scheduledMessage.NMSMessageId没有持有调度程序ID。

如果有人感兴趣,这是干净的 C# 源代码:

var factory = new Apache.NMS.ActiveMQ.ConnectionFactory(brokerUri);
IConnection connection = factory.CreateConnection(userName, password);

var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
var requestBrowse = session.GetTopic("ActiveMQ.Scheduler.Management");
var queue = session.GetQueue(queueName);
var consumer = session.CreateConsumer(queue);

connection.Start();

var producer = session.CreateProducer(requestBrowse);
var scheduledMessage = consumer.Receive(TimeSpan.FromSeconds(10));
if (scheduledMessage != null)
{
    // do check with persistent storage, if schedule is canceled, remove it:
    var remove = session.CreateMessage();
    // get prop names from : http://activemq.apache.org/maven/apidocs/constant-values.htm
    remove.Properties["AMQ_SCHEDULER_ACTION"] = "REMOVE";
    remove.Properties["scheduledJobId"] = scheduledMessage.Properties.GetString("scheduledJobId");
    producer.Send(remove);
}

producer.Close();
session.Close();
connection.Close();

流程是这样的:从某个队列中获取消息,如果满足某些条件,则完全放弃调度。

于 2020-09-10T19:53:29.940 回答