在我的场景中,应用程序使用 CDI 生产者从 JMS 队列中读取消息。
@Produces
public ConnectionFactory connectionFactory() {
JmsFactoryFactory ff;
JmsConnectionFactory factory;
try {
// Get a new JMSConnectionFactory
ff = JmsFactoryFactory.getInstance(JmsConstants.WMQ_PROVIDER);
factory = ...
我正在使用 quarkus-smallrye 实现。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-jms</artifactId>
<version>2.7.0</version>
</dependency>
连接器看起来很简单:
@Incoming("ORDER")
public CompletionStage<Void> consume(Message<String> message) {
return CompletableFuture.runAsync(() -> processMessage(message)).thenRun(message::ack);
// used a Runnable, ...
}
配置:
mp.messaging.incoming."ORDER".connector=smallrye-jms
我尝试了不同的方法来处理消息,但在每种情况下,我都只有一个工作线程。
日志每次显示相同的结果:
[DEBUG] Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
[DEBUG] Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
[DEBUG] Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
[DEBUG] Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
[DEBUG] Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
...
我使用了这个文档并搜索了很多 ;-)
我的错误是什么?为什么我只运行一个线程?如果您需要更多信息,请告诉我。