我们有一种情况,我们设置了一个组件来远程使用 Spring Batch 运行批处理作业。我们发送一条带有作业 xml 路径、名称、参数等的 JMS 消息,并在调用批处理客户端上等待来自服务器的响应。
服务器读取队列并调用适当的方法来运行作业并返回结果,我们的消息传递框架通过以下方式完成:
this.jmsTemplate.send(queueName, messageCreator);
this.LOGGER.debug("Message sent to '" + queueName + "'");
try {
final Destination replyTo = messageCreator.getReplyTo();
final String correlationId = messageCreator.getMessageId();
this.LOGGER.debug("Waiting for the response '" + correlationId + "' back on '" + replyTo + "' ...");
final BytesMessage message = (BytesMessage) this.jmsTemplate.receiveSelected(replyTo, "JMSCorrelationID='"
+ correlationId + "'");
this.LOGGER.debug("Response received");
理想情况下,我们希望能够调用两次 runJobSync 方法,并让两个作业同时运行。我们有一个单元测试,它做类似的事情,没有工作。我意识到这段代码不是很好,但是,这里是:
最终列表结果 = Collections.synchronizedList(new ArrayList());
Thread thread1 = new Thread(new Runnable(){
@Override
public void run() {
client.pingWithDelaySync(1000);
result.add(Thread.currentThread().getName());
}
}, "thread1");
Thread thread2 = new Thread(new Runnable(){
@Override
public void run() {
client.pingWithDelaySync(500);
result.add(Thread.currentThread().getName());
}
}, "thread2");
thread1.start();
Thread.sleep(250);
thread2.start();
thread1.join();
thread2.join();
Assert.assertEquals("both thread finished", 2, result.size());
Assert.assertEquals("thread2 finished first", "thread2", result.get(0));
Assert.assertEquals("thread1 finished second", "thread1", result.get(1));
当我们运行该测试时,线程 2 首先完成,因为它等待了 500 毫秒,而线程 1 等待了 1 秒:
Thread.sleep(delayInMs);
return result;
这很好用。当我们在野外运行两项远程作业时,一项需要大约 50 秒才能完成,另一项设计为立即失败并返回,这不会发生。
启动 50 秒作业,然后立即启动即时失败作业。客户端打印我们发送了一条消息,请求作业运行,服务器打印它收到了 50 秒的请求,但在处理第二条消息之前一直等到 50 秒的作业完成,即使我们使用了 ThreadPoolExecutor。
我们正在使用自动确认运行事务。
做一些远程调试,来自 AbstractPollingMessageListenerContainer 的 Consumer 没有显示未处理的消息(所以 consumer.receive() 显然只是一遍又一遍地返回 null )。amq 代理的 webgui 显示 2 个入队,1 个双端队列,1 个已调度,1 个在调度队列中。这向我表明,有些东西正在阻止 AMQ 让消费者“拥有”第二条消息。(预取为 1000 btw)这显示为特定队列的唯一消费者。
在过去的几天里,我自己和其他一些开发人员一直在四处寻找,但几乎一无所获。任何建议,如果这是预期的行为,我们配置错误,或者在这里会被破坏。
远程调用的方法是否重要?目前,作业处理程序方法使用执行程序在不同的线程中运行作业并执行 future.get() (额外的线程是出于与日志记录相关的原因)。
任何帮助是极大的赞赏