0

在我的场景中,应用程序使用 CDI 生产者从 JMS 队列中读取消息。

@Produces
  public ConnectionFactory connectionFactory() {
    JmsFactoryFactory ff;
    JmsConnectionFactory factory;
    try {
      // Get a new JMSConnectionFactory
      ff = JmsFactoryFactory.getInstance(JmsConstants.WMQ_PROVIDER);
      factory = ...

我正在使用 quarkus-smallrye 实现。

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
    </dependency>
    <dependency>
      <groupId>io.smallrye.reactive</groupId>
      <artifactId>smallrye-reactive-messaging-jms</artifactId>
      <version>2.7.0</version>
    </dependency>

连接器看起来很简单:

  @Incoming("ORDER")
  public CompletionStage<Void> consume(Message<String> message) {
    return CompletableFuture.runAsync(() -> processMessage(message)).thenRun(message::ack);
    // used a Runnable, ...
  }

配置:

mp.messaging.incoming."ORDER".connector=smallrye-jms

我尝试了不同的方法来处理消息,但在每种情况下,我都只有一个工作线程。

日志每次显示相同的结果:

 [DEBUG]          Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
 [DEBUG]          Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
 [DEBUG]          Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
 [DEBUG]          Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
 [DEBUG]          Thread Thread[ForkJoinPool.commonPool-worker-15,5,main]
...

我使用了这个文档并搜索了很多 ;-)

我的错误是什么?为什么我只运行一个线程?如果您需要更多信息,请告诉我。

4

0 回答 0