问题标签 [smallrye-reactive-messaging]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1740 浏览

java - Quarkus + Kafka + Smallrye 异常处理

如何使用 quarkus + kafka + smallrye 处理流处理异常?

我的代码与 quarkus 指南 ( https://quarkus.io/guides/kafka#imperative-usage )上的命令式生产者示例非常相似

我想要类似于 vanilla Kafka 库的东西,它提供了处理请求发送的每条记录的回调的选项。

Tks

0 投票
1 回答
354 浏览

apache-kafka - 如何逐条消息向Kafka发送消息

我是响应式编程的新手,我尝试实现一个非常基本的场景。每次将文件拖放到特定文件夹时,我都想向 kafka 发送一条消息。我认为我不太了解基础知识...所以请您帮帮我吗?

所以我有几个问题: smallrye-reactive-messaging 和 smallrye-reactive-streams-operators 有什么区别?

我有这个简单的代码:

当代码进入 if 语句时,一切正常,我的对象的 JSON 序列化将为空值。但是我不明白为什么当我的代码转到 else 语句时,没有任何话题?似乎 if 语句的 .of 指令破坏了流或类似的东西......

如何保持对新删除的文件“做出反应”的连续流?(或其他事件,如 HTTP GET 请求或类似的东西)......

例如,如果我不返回 PublisherBuilder 的实例,而是返回 Integer,那么我的 kafka 主题将由非常庞大的 Integer 值流填充。这就是为什么示例在发送消息时使用一些间隔......

我应该使用一些 CompletationStage 还是 CompletableFuture ?RXJAva2?使用哪个库有点令人困惑(vertx、smallrye、rxjava2、microprofile、...)

之间有什么区别:

  • ReactiveStreams.fromCompletionStage
  • ReactiveStreams.fromProcessor
  • ReactiveStreams.fromPublisher
  • ReactiveStreams.fromSubscriber

在哪种情况下使用哪个?

非常感谢 !

0 投票
0 回答
303 浏览

quarkus - Quarkus / Smallrye 反应式消息传递 - 消息重新传递

我目前正在研究 Quarkus 中的 Smallrye Reactive Messaging 集成。乍一看,发送和接收消息非常简单和优雅。

但是我没有发现的一件事是:如何处理重新发送消息?

示例:我们收到一条消息并尝试处理它。发生了一些异常(可能是数据库不可用或乐观锁异常等)。在这种情况下,我会抛出一个异常,这样消息就不会被确认。但目前我看不到消息是如何重新传递的。

我建立了一个小的虚拟项目来测试这个:

  • 夸库斯
  • ActiveMQ 阿尔忒弥斯
  • 将消息(通过 Artemis 控制台)发送到队列
    中——队列配置为 max redelivery = 3
  • 使用 Quarkus / Smallrye Reactive Messaging @Incoming 注解接收消息
  • @Incoming 方法抛出异常
    --> 消息从 Artemis 队列中移除
    --> @Incoming 方法只被调用一次

如果我关闭 Quarkus 应用程序,可以在 Artemis 队列中再次看到该消息,并将重新传递标志设置为 true。

但是我找不到如何在 Smallrye 反应式消息传递中管理/配置重新传递,以便该层处理消息的重新传递 n 次,并在最大重试次数后将消息放入 DLQ。

有没有办法做到这一点?

0 投票
0 回答
249 浏览

quarkus - 在 quarkus 中发送带有标头的 amqp 消息

我想使用 Quarkus 在 AMQP 消息中将 Java(以及 Kotlin)POJO 作为 JSON 发送到 RabbitMQ。

在兔子队列上,消息作为 base64 编码字节流接收。

如何在此处设置标题以将内容类型放入其中?像 TTL 这样的标头设置也可能很有趣。

0 投票
1 回答
3078 浏览

rest - 使用 io.smallrye.mutiny.Uni 创建响应对象

我正在尝试在 Quarkus 框架上学习使用 ReactiveMongoClient。

我以 Uni> 的身份发送回复部分成功

但是,当我尝试让某个其他类(StaffResponse)的对象包含一个用于分页的 Link 对象时,我没有得到任何 Staff 记录。(现在我已经硬编码了分页链接)

响应中的“员工”为空。

MongoClient 正在返回 Staff 列表,看起来 Response 对象没有获取列表。尝试阅读 SmallRye Mutiny 文档 - 无法解决。

请帮忙。

我已在以下位置提交代码:https ://github.com/doepradhan/staffApi 和一个示例 json 数据文件(https://github.com/doepradhan/staffApi/blob/master/sample-staff-data.json

谢谢你的帮助。

0 投票
0 回答
860 浏览

java - Quarkus 消息消费者并发(多线程)

在 JMS 世界中,您可以通过调整消息驱动 bean (MDB) 中的maxSession属性轻松定义并发度。现在 Quarkus 显然没有 MDB,但是如何实现相同级别的并发调优呢?

我查看了指南(Quarkus - Using JMS),但示例中的所有消费者所做的都是使用无限循环获取消息。我认为这是一个非常糟糕的例子。我了解语法,但它使用消息的方式对我来说似乎很忙。

所以我想也许我没有抓住重点,而响应式消息传递是更有效地处理更多消息的方法。我查看了Quarkus 指南 - Using AMPQ with Reactive messages但是它再次没有说明消费者的并发性。在最后一个示例中,Price Generator bean 是一个应用程序范围的 bean,它使用响应式消息注释从 Artemis 代理消费消息,这很酷,但是并发呢?

如果我希望传入通道(或 JMS 队列)的消耗速度比其他通道快,那么如何在 Quarkus 上实现呢?你能分享一个例子吗?

0 投票
1 回答
45 浏览

quarkus - 重新启动响应式消息传递,例如在重新配置之后

如何重新启动或停止/恢复响应式消息传递,例如在更改间隔时间之后?此示例来自 Quarkus 指南:https ://quarkus.io/guides/kafka-streams

0 投票
1 回答
2117 浏览

java - 单如何在 REST 调用中获得失败或成功响应

在服务器发送电子邮件后,我正在尝试设置简单的成功/失败响应。

但是,即使经过数小时的尝试多种变体,我仍然没有得到正确的响应。

只是给出一个接受的响应示例代码在这里:

但是,当邮件没有成功发送时,我想在这里提供另一个回复。我试过的是这个(但这是一个没有成功的 Uni 演员):

控制台输出(由于密码错误没有发送邮件时):

还有另一个选择:

控制台日志(带有 println)。它在我收到接受的客户端输出后执行。

客户端输出(由于密码错误没有发送邮件时):

但是机器人没有成功。我只想接收邮件是否已发送或发送时是否有任何错误。有人对如何进行有任何想法提示吗?

0 投票
1 回答
446 浏览

java - Quarkus 从 Kafka 主题中提取并将 JSON 有效负载发送到 REST 端点

因此,我将 Quarkus 与 Microprofile Reactive Messaging 框架(带有 SmallRye Kafka 连接器)和 RxJava2 Flowable 流对象一起用于响应式消息接收/发送。我有一个微服务,它使用 @Incoming 和 @Outgoing 注释来正确使用通道从后面的主题中提取并将消息推送到主题。

但是,现在我想修改它,以便我仍然可以从 Kafka 主题中提取数据,然后将 JSON 有效负载发送到 REST 端点。据我所知,没有与 Quarkus HTTP 兼容的 SmallRye 连接器。有没有人碰巧知道任何方法可以让它工作?

示例函数

0 投票
1 回答
93 浏览

apache-kafka - Quarkus 应用程序中 Kafka 反序列化器中的 CDI 上下文

我有一个基于 Kafka 的带有 Smallrye 反应式消息传递的 Quarkus 项目。因为我想使用“复杂的 pojo”,所以我需要一个自定义的反序列化器。

我想制作这两个类 CDI bean,这样我就可以注入和使用我的自定义记录器,它是一个 CDI bean。有没有办法做到这一点?


现在我注入的记录器对象只是空的: