2

@Incoming("queue")是否可以使用带有 quarkus 和 smallrye-reactive-messaging注释的相同方法并行处理多个 amqp -消息?

更准确地说,我有以下课程:

@ApplicationScoped
public class Receiver {
    @Incoming("test-queue")
    public void process(String input) {
        System.out.println("start processing:" + input);
        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end processing:" + input);
    }
}

使用 application.properties 中的配置:

amqp-host: localhost
amqp-port: 5672
amqp-username: quarkus
amqp-password: quarkus
mp.messaging.incoming.test-queue.connector: smallrye-amqp
mp.messaging.incoming.test-queue.address: test-queue

现在我想通过配置定义可以并行处理多少消息。例如,在 4 核 cpu 上,它应该并行运行 4 个。

目前,我可以添加 4 个具有不同名称的方法副本以允许这种并行性,但这是不可配置的。

4

2 回答 2

0

我不确定,但我认为 Reactive Messaging 不支持您的要求。

然而,你可以用另一种方式做你想做的事。我认为这也是使用消息传递的更好的整体模式。

http://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.5/amqp/amqp.html#amqp-inbound

找到带有 CompletionStage 和显式 ack() 的示例。该变体是异步的,因此如果将其与 Java 现有的并发设施结合使用,您将获得高效的并行处理。

我会将传入的工作发送给执行程序,然后在完成时执行任务 ack()。

于 2020-12-09T15:39:24.783 回答
0

我刚刚遇到了同样的情况,这就是规范打算如何让您处理并发: 来自 eclipse Microprofile 规范

基本上,而不是有一个具有这样的方法的类:

@Incoming("test-queue")
public void process(String input) {}

您有 2 个这样的课程:

@ApplicationScoped
public class MessageSubscriberProducer {

    @Incoming("test-queue")
    public Subscriber<String> createSubscriber() {
        return new SubscriberImpl();
    }
}

public class SubsciberImpl implements Subscriber<String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(4);  // this tells how many messages to grab right away
    }

    @Override
    public void onNext(String val) {
        // do processing
        this.subscription.request(1);  // grab 1 more
    }
}

这具有将处理代码从 vert.x 事件循环线程移动到工作线程池的额外优势。

于 2022-01-28T16:25:38.640 回答