问题标签 [smallrye-reactive-messaging]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
515 浏览

quarkus - 如何对返回 Smallrye 叛变反应库的 Uni/Multi 的方法进行单元测试?

我在我的Quarks 应用程序中使用 Smallrye Mutiniy 反应库,因为它在 Quarks 应用程序中本机支持。

我正在尝试为服务类编写单元测试。我不确定如何为返回Uni / Multi的方法编写单元测试。

返回的方法Uni<String>

为上述方法实现的单元

控制台日志

0 投票
1 回答
231 浏览

java - quarkus.uuid 作为表达式在 application.properties 中不起作用

编写了一个程序,使用 smallrye-kafka-messaging 读取 kafka 主题。现在我需要 group.id 属性在服务运行/重新启动时是随机的,所以我在 quarkus 项目中使用了这个属性

mp.messaging.incoming.incoming_topic_name.group.id=${quarkus.uuid}

${quarkus.uuid}在 quarkus 指南中提到用于在 application.properties 中生成随机 uuid

但是当在应用程序中使用此属性时,会出现此异常

也试过这个 - 删除了 group.id 属性,所以 kafka 使用quarkus.application.name作为消费者组 ID,所以即使在删除应用程序名称属性之后,quarkus 也使用服务名称作为消费者组 ID。

还有其他方法可以将 kafka 侦听器的 group.id 设置为随机吗?

0 投票
1 回答
210 浏览

apache-kafka - Quarkus - 基于 kafka write ack 和 nack 的状态响应

我有一个将数据推送到 kafka 的端点。现在,我想分别用适当的状态码 2xx 或 5xx 来响应调用,以防 kafka 写入成功或失败。代码片段是

现在的问题是端点在执行 ack 或 nack 回调之前响应状态码。也尝试了的sendAndAwait方法,MutinyEmitter但该方法返回无效。所以没有办法知道消息是 acked 还是 nacked。

0 投票
0 回答
129 浏览

java - Quarkus:当与带有消息签名的处理器方法一起使用时,忽略响应式消息传递的失败策略会引发 CIRCULAR REFERENCE 错误

我正在使用 Quarkus v2.3.0 和 Kafka 扩展进行响应式消息传递。我复制了这篇博文中给出的忽略失败策略示例https://quarkus.io/blog/kafka-failure-strategy/#the-ignore-strategy

我尝试使用 Message<> 包装器签名将其应用于前导方法(使用 @Incoming 和 @Outoging)。但是当抛出异常时,我得到以下错误并且流肯定停止,这不是忽略失败策略的预期行为:

这是处理器方法:

相同的代码正在使用有效负载签名:

记录了异常,但忽略了错误,并且电影主题的消费按预期继续。

我错过了什么 ?为什么不忽略异常?

0 投票
0 回答
118 浏览

apache-kafka - 在使用 quarkus 和 opentracing 通过 Kafka 传递消息时维护原始 traceId

我正在尝试使用两个通过 Kafka 通信并使用 opentracing 进行跟踪的 Quarkus (2.4.1.Final) 微服务​​(生产者和消费者)创建最基本的工作示例。

我遵循了kafkaopentracing教程,在开发模式下运行了生产者和消费者(因此他们创建了 redpanda kafka 代理),然后尝试发出 POJO 并在消费者和生产者中记录 traceId。据我了解,这应该是开箱即用的。

POJO 可以顺利发送、序列化和反序列化。消费者收到的 kafka 消息头甚至使用该uber-trace-id字段注入了正确的原始跟踪和跨度 ID(我已经检查过调试生产者和消费者) 。

但是,由于某种原因,在记录跟踪 ID 时,它们不匹配。这就像跟踪上下文“忘记”了它通过 kafka 接收到的跨度。请注意,我的业务需求是只打印每个日志的 traceId,以便我们可以通过 kibana 跟踪打印的日志。

制作人:

消费者:

POJO:

生产者的应用配置:

和消费者:

串行器/解串器

依赖项(两者相同):

基本用例:

将 http://localhost:8090/hello/John 放入浏览器。您将在生产者处看到日志:

而在消费者

请注意跟踪 ID 不同。我不确定我还应该做什么/配置...

0 投票
0 回答
57 浏览

java - Quarkus,如何发送带有保留标志的 Mqtt 消息?

我目前正在使用 smallrye 反应消息发送带有 Quarkus 的 Mqtt 消息,问题是当我将保留标志添加为 true 时,没有任何变化。

这是我的做法:

任何帮助或指导将不胜感激

0 投票
0 回答
43 浏览

quarkus - 最新的 Quarkus 版本 (2.6.3) 反应式消息传递错误

我最近升级到了最新的 Quarkus 版本,并且似乎在 Smallrye 的反应式消息传递的配置方面遇到了一些问题。在 2.6.3 之前,连接器一直是smallrye-kafka,但是当我运行我的应用程序时,我现在似乎遇到了一个奇怪的错误,因为它无法将其识别为已知的 Kafka 连接器?

错误:

这是我的配置(出于演示目的省略了实际值):

我可以从官方文档中收集到的唯一信息是一些响应式消息传递包被拆分,但没有提及此连接器更改(看到我确定它没有更改)。

0 投票
0 回答
70 浏览

jdbc - 如何使用响应式消息将阻塞代码合并到 Mutiny 中

我想就如何正确实施这一点以及这是否是正确的方法获得意见。因此,我使用反应式消息创建了一个完全反应式的应用程序,但我知道我有一个总体约束——那个约束是 Oracle。如您所知,JDBC 本质上是阻塞的,不能真正异步执行。我试图找出一种方法来实现这一点,这样事件循环线程就不会被阻塞,到目前为止,唯一可行的事情是这样的:

我已经设法让它按原样工作,但我不确定这是否是正确/有效的方式。有没有人对如何完成这样的任务有任何建议?任何建议都非常感谢和考虑,谢谢!

0 投票
1 回答
47 浏览

java - 反应式消息传递 - ClassCastException 不能转换为 io.smallrye.mutiny.Multi 类

运行此方法时会引发ClassCastException

consume has thrown an exception: java.lang.ClassCastException: class java.lang.String cannot be cast to class io.smallrye.mutiny.Multi

我可以消费Multi<String>吗?

查看SmallRye Reactive Messaging > Development Model > Consuming Payloads似乎我一次只能使用一条消息。

0 投票
0 回答
22 浏览

logging - 更改 smallrye kafka 消费者扩展的日志级别

我有一个使用quarkus-smallrye-reactive-messaging-kafka扩展的 quarkus 应用程序。

在运行时它会产生大量的调试日志:

我试图在中设置不同的日志级别,application.properties但它不起作用:

我该如何解决?