1

我们有一种情况,我们设置了一个组件来远程使用 Spring Batch 运行批处理作业。我们发送一条带有作业 xml 路径、名称、参数等的 JMS 消息,并在调用批处理客户端上等待来自服务器的响应。

服务器读取队列并调用适当的方法来运行作业并返回结果,我们的消息传递框架通过以下方式完成:

    this.jmsTemplate.send(queueName, messageCreator);
    this.LOGGER.debug("Message sent to '" + queueName + "'");

    try {
        final Destination replyTo = messageCreator.getReplyTo();
        final String correlationId = messageCreator.getMessageId();

        this.LOGGER.debug("Waiting for the response '" + correlationId + "' back on '" + replyTo + "' ...");
        final BytesMessage message = (BytesMessage) this.jmsTemplate.receiveSelected(replyTo, "JMSCorrelationID='"
                + correlationId + "'");
        this.LOGGER.debug("Response received");

理想情况下,我们希望能够调用两次 runJobSync 方法,并让两个作业同时运行。我们有一个单元测试,它做类似的事情,没有工作。我意识到这段代码不是很好,但是,这里是:

最终列表结果 = Collections.synchronizedList(new ArrayList());

    Thread thread1 = new Thread(new Runnable(){

        @Override
        public void run() {
            client.pingWithDelaySync(1000); 
            result.add(Thread.currentThread().getName());
        }

    }, "thread1");

    Thread thread2 = new Thread(new Runnable(){

        @Override
        public void run() {
            client.pingWithDelaySync(500);              
            result.add(Thread.currentThread().getName());
        }

    }, "thread2");

    thread1.start();
    Thread.sleep(250);
    thread2.start();

    thread1.join();
    thread2.join();

    Assert.assertEquals("both thread finished", 2, result.size());
    Assert.assertEquals("thread2 finished first", "thread2", result.get(0));
    Assert.assertEquals("thread1 finished second", "thread1", result.get(1));

当我们运行该测试时,线程 2 首先完成,因为它等待了 500 毫秒,而线程 1 等待了 1 秒:

Thread.sleep(delayInMs);
    return result;

这很好用。当我们在野外运行两项远程作业时,一项需要大约 50 秒才能完成,另一项设计为立即失败并返回,这不会发生。

启动 50 秒作业,然后立即启动即时失败作业。客户端打印我们发送了一条消息,请求作业运行,服务器打印它收到了 50 秒的请求,但在处理第二条消息之前一直等到 50 秒的作业完成,即使我们使用了 ThreadPoolExecutor。

我们正在使用自动确认运行事务。

做一些远程调试,来自 AbstractPollingMessageListenerContainer 的 Consumer 没有显示未处理的消息(所以 consumer.receive() 显然只是一遍又一遍地返回 null )。amq 代理的 webgui 显示 2 个入队,1 个双端队列,1 个已调度,1 个在调度队列中。这向我表明,有些东西正在阻止 AMQ 让消费者“拥有”第二条消息。(预取为 1000 btw)这显示为特定队列的唯一消费者。

在过去的几天里,我自己和其他一些开发人员一直在四处寻找,但几乎一无所获。任何建议,如果这是预期的行为,我们配置错误,或者在这里会被破坏。

远程调用的方法是否重要?目前,作业处理程序方法使用执行程序在不同的线程中运行作业并执行 future.get() (额外的线程是出于与日志记录相关的原因)。

任何帮助是极大的赞赏

4

1 回答 1

0

not sure I follow completely, but off the top, you should try the following...

  • set the concurrentConsumers/maxConcurrentConsumers greater than the default (1) on the MessageListenerContainer
  • set the prefetch to 0 to better promote balancing messages between consumers, etc.
于 2012-10-18T22:38:53.567 回答