问题标签 [smallrye-reactive-messaging]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
229 浏览

quarkus - Quarkus:如何与 Artemis 使用点对点消息传递?

我正在使用 quarkus 版本:1.5.2.Final以及以下依赖项以包含反应式消息传递(我在发送方和接收方项目之间有一个正在运行的 Artemis docker 实例):

在发件人项目上,我@Outgoing('stock-quote')在一个方法上有一个注释,该方法每秒产生一个随机数并返回Flowable<Message<String>>. 配置如下:

在receiver项目上,receiver所在的位置有一个带有注解@Incoming('stock)'.的方法配置如下:

我注意到了什么?一切正常。当接收方项目由于某种原因宕机时,发送方仍在发送的消息会被持久化,这很好。但是,当接收器再次联机时,接收器不会收到持久消息,而是在单独的“客户端线程”上启动。持久化的消息只是留在那里。

我还尝试@Broadcast在发件人端添加注释。由于我还希望有多个接收器实例,因此消息被分配给接收器。然后,当接收者下线时,消息将再次被持久化。当接收器恢复时,它会收到正在发送的新消息,但持久消息只是留在那里(在另一个“客户端线程”中,更糟糕的是,这个“客户端线程”上的持久消息正在累积(由于广播)。

那我想要什么?我希望有一个发送消息的发件人。这些消息被划分为两个或多个接收器实例。如果一个接收器出现故障,则消息将由其他实例处理。如果所有的接收者都挂了,则消息正在被持久化,一旦有一个或多个接收者再次在线,这些持久化的消息就会被处理。所以基本上我想要这个:

在此处输入图像描述

任何人都知道如何结合 quarkus 反应消息和 artemis 来做到这一点?

0 投票
1 回答
1033 浏览

java - Quarkus 使用 kafka 进行响应式消息传递

我有两个微服务,一个生产者和一个消费者。生产者每两秒向 kafka 主题写入一个数字增量。消费者有两个正在运行的实例,消耗这些增量。我注意到一些我想解决的奇怪问题:

  • 当还没有消费者,生产者开始生产时,消息被保存在 kafka 中。当消费者上线时,它不会处理生产者已经产生的已经存在的消息,而是开始消费现在进来的消息。消费者如何处理所有未消费的消息?
  • 当有两个消费者时,我希望两个消费者平等消费。现在只有一个消费者得到了所有的负载,而另一个只是坐在那里。如何将负载分散到消费者数量上?
  • 看起来 kafka 保存了所有正在生成的记录,即使它已经被消费者消费了。有什么办法可以防止这种情况发生吗?我找不到关于例如致谢的好信息。

有人知道这三个问题之一的答案吗?

消费者配置:

生产者配置:

0 投票
1 回答
737 浏览

java - 使用 SmallRye 反应式消息动态发布/订阅 MQTT

我们尝试使用 smallrye 响应式消息传递发布和订阅 MQTT 协议。我们设法通过以下简单代码将消息实际发布到特定主题/频道

我们想要做的是,只要我们想以generate()某种方式使用动态主题调用该方法,用户将在其中定义它。那是我们的问题,但后来我们从github的那个repo中找到了这些类。包裹名字io.smallrye.reactive.messaging.mqtt

例如,我们发现有一个类说它对 MQTT 代理(Mosquitto 服务器启动)进行发布调用。

在该语句中,SendingMqttMessage<String> message = new SendingMqttMessage<String>("myTopic","A message in here",0,false); 我们在“SendingMqttMessage(java.lang.String, java.lang.String, io.netty.handler.codec.mqtt.MqttQoS, boolean)”下面的红色下划线SendingMqttMessage<String>“io. smallrye.reactive.messaging.mqtt.SendingMqttMessage'。无法从外部包访问

更新(发布完成) 最后向 mqtt 代理(一个 mosquitto 服务器)发出了一个发布请求,所有这些都使用用户配置的动态主题。正如我们发现的,以前的 ClassSendingMqttMessage根本不应该被使用。我们发现我们还需要和发射器来实际发出带有动态主题的发布请求。

现在我们需要了解如何动态订阅主题。

0 投票
1 回答
713 浏览

java - Quarkus Kafka Streams/Reactive Messaging 反序列化异常

嘿,所以我正在尝试使用 Kafka Streams 和 MP Reactive Messaging 来读取 Kafka 主题,然后再返回给它。

卡夫卡流错误 -

Reactive Messaging 错误类似,但基本上消息被反序列化为的 POJO 看起来像这样 -

请注意,有一个默认的空构造函数和一个包含所有字段的构造函数。

但是,我猜它被称为“毒丸”,但是从 MQ 产生的一条带有“Aloha”有效负载的消息会破坏它,我无法反序列化它。我猜这是因为 'Aloha' 不被识别为字符串,因为它是单引号。我无权访问该数据的发送方式,因为它是通过 MQ 发送的。有没有办法跳过处理这个不可反序列化的消息并继续从主题处理?

0 投票
0 回答
249 浏览

java - NPE 消费 Kafka 主题

我遇到了一个令人生畏的 NPE,它使用 Quarkus 1.6.1 和 Smallrye 反应式消息传递框架来使用 Kafka 主题。有趣的是,它不会以较低的消息速率(约 100 条消息/秒)发生,它似乎发生在较高的消息速率 +300 条消息/秒时。测试环境是一个简单的 Quarkus Kafka 消息生产者和附加到同一主题并使用反应式注释(@Outgoing、@Incoming)的消费者。当我引导测试应用程序(mvnw compile quarkus:dev)时抛出错误。经过几次重试后,应用程序开始按预期工作。

请检查下面消耗 Kafka 记录的 bean。

开放JDK 11

0 投票
1 回答
520 浏览

apache-kafka - Quarkus 背压配置

我使用带有 kafka 的 quarkus smallrye 反应式消息传递得到以下堆栈跟踪:

所以我读过https://smallrye.io/smallrye-mutiny/#_how_do_i_control_the_back_pressure

根据文档,我添加了 BackPressure 控件。

前 :

后 :

现在一切都好了。

我的帖子的目的是了解我为什么需要这样做?

https://smallrye.io/smallrye-mutiny/apidocs/io/smallrye/mutiny/subscription/BackPressureStrategy.html

为什么缓冲区不能保留你?

如您所见,我每 5 秒(poolingInterval)调用一次简单的 sql 函数。该函数返回一些记录(通过池化从不超过 10 个)

所以流量非常低

请允许我用一些话来理解缓冲区管理。

谢谢

0 投票
1 回答
171 浏览

apache-kafka - Quarkus SmallRye Messaging Kafka Avro 本机编译失败

我正在使用带有 Confluent Registry 和 AVRO 的 SmallRye Reactive Messaging 和 Kafka。它工作正常,如本博客中所述https://quarkus.io/blog/kafka-avro/ 但它似乎不适用于与博客相关的源代码的本机编译:https ://github.com/cescoffier/quarkus -kafka-and-avro

我的环境(Avro 1.10.0 和 Confluent Registry)也有同样的问题。您将在此消息的末尾看到日志。

是否计划在本机支持上工作?或者可能是一个必须打开的问题?

备注:使用 Kafka Streams + Avro + Confluent Registry 的 quarkus avro 扩展在本机中运行良好。

谢谢。

[kafka-and-avro-1.0.0-SNAPSHOT-runner:96219] 分析:28 016,32 ms,2,72 GB 错误:应该在运行时初始化的类在图像构建期间被初始化:me.escoffier.quarkus .Movie 要求在运行时初始化该类(来自功能 io.quarkus.runner.AutoFeature.beforeAnalysis)。要查看 me.escoffier.quarkus.Movie 被初始化的原因,请使用 -H:+TraceClassInitialization org.apache.avro.generic.GenericDatumReader 要求在运行时初始化该类(来自功能 io.quarkus.runner.AutoFeature.beforeAnalysis) . 要了解 org.apache.avro.generic.GenericDatumReader 被初始化的原因,请使用 -H:+TraceClassInitialization

com.oracle.svm.core.util.UserError$UserException:应该在运行时初始化的类在图像构建期间被初始化:me.escoffier.quarkus.Movie 要求在运行时初始化该类(来自功能 io. quarkus.runner.AutoFeature.beforeAnalysis)。要查看 me.escoffier.quarkus.Movie 被初始化的原因,请使用 -H:+TraceClassInitialization org.apache.avro.generic.GenericDatumReader 要求在运行时初始化该类(来自功能 io.quarkus.runner.AutoFeature.beforeAnalysis) . 要了解 org.apache.avro.generic.GenericDatumReader 被初始化的原因,请使用 -H:+TraceClassInitialization

0 投票
1 回答
219 浏览

apache-kafka - 代理不可用后自动重启 Quarkus 微服务

我有一个非常简单的 Quarkus 微服务,它使用 smallrye 反应式消息传递 (kafka)。有时我的卡夫卡经纪人出现故障,我得到以下日志:

重新启动代理后,我必须手动重新启动我的微服务。是否可以在不进行任何手动操作的情况下为微服务添加重新使用新传入消息的功能?

谢谢!

0 投票
1 回答
397 浏览

quarkus - 在 Quarkus 中启动后台进程

我需要在使用 Quarkus 编写的微服务中使用来自 RabbitMQ 的消息。我尝试对 Quarkus 使用smallrye-reactive-messaging,但遇到了两个问题:

  1. 它仅支持 AMQP 1.0,不适用于 RabbitMQ(即使我使用的是实验性的 AMQP 1.0 插件)。
  2. 它适用于 ActiveMQ Artemis,但还有另一个问题:smallrye-reactive-messaging 是...响应式,这很好,但现在没有时间将我的数据库代码重写为响应式。处理消息意味着将数以万计的文档持久化到 mongodb 中,这可能需要几分钟,而且似乎阻塞了整个服务器:

警告 [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) 线程 Thread[vert.x-eventloop-thread-0,5,main]=Thread[vert.x-eventloop-thread-0 ,5,main] 已被阻塞 212088 毫秒,时间限制为 2000 毫秒:io.vertx.core.VertxException: 线程阻塞

所以我的解决方法是在 Quarkus 启动时启动一个线程来消费和处理消息。Quarkus支持调度定期任务是否有后台进程的注释或者我必须编写自己的扩展

0 投票
0 回答
63 浏览

quarkus - 从 Kafka 消费时 JPA 事务成功后的确认消息

在 Quarkus 应用程序中,我想使用 Kafka 消息并使用实体管理器将其信息保存在数据库中。

这是我到目前为止得到的:

但是使用这种方法,消息会在记录实际保存在数据库中之前得到确认。如果 JPA 事务成功,有没有办法只确认消息?