问题标签 [smallrye-reactive-messaging]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - Quarkus + Kafka + Smallrye 异常处理
如何使用 quarkus + kafka + smallrye 处理流处理异常?
我的代码与 quarkus 指南 ( https://quarkus.io/guides/kafka#imperative-usage )上的命令式生产者示例非常相似
我想要类似于 vanilla Kafka 库的东西,它提供了处理请求发送的每条记录的回调的选项。
Tks
apache-kafka - 如何逐条消息向Kafka发送消息
我是响应式编程的新手,我尝试实现一个非常基本的场景。每次将文件拖放到特定文件夹时,我都想向 kafka 发送一条消息。我认为我不太了解基础知识...所以请您帮帮我吗?
所以我有几个问题: smallrye-reactive-messaging 和 smallrye-reactive-streams-operators 有什么区别?
我有这个简单的代码:
当代码进入 if 语句时,一切正常,我的对象的 JSON 序列化将为空值。但是我不明白为什么当我的代码转到 else 语句时,没有任何话题?似乎 if 语句的 .of 指令破坏了流或类似的东西......
如何保持对新删除的文件“做出反应”的连续流?(或其他事件,如 HTTP GET 请求或类似的东西)......
例如,如果我不返回 PublisherBuilder 的实例,而是返回 Integer,那么我的 kafka 主题将由非常庞大的 Integer 值流填充。这就是为什么示例在发送消息时使用一些间隔......
我应该使用一些 CompletationStage 还是 CompletableFuture ?RXJAva2?使用哪个库有点令人困惑(vertx、smallrye、rxjava2、microprofile、...)
之间有什么区别:
- ReactiveStreams.fromCompletionStage
- ReactiveStreams.fromProcessor
- ReactiveStreams.fromPublisher
- ReactiveStreams.fromSubscriber
在哪种情况下使用哪个?
非常感谢 !
quarkus - Quarkus / Smallrye 反应式消息传递 - 消息重新传递
我目前正在研究 Quarkus 中的 Smallrye Reactive Messaging 集成。乍一看,发送和接收消息非常简单和优雅。
但是我没有发现的一件事是:如何处理重新发送消息?
示例:我们收到一条消息并尝试处理它。发生了一些异常(可能是数据库不可用或乐观锁异常等)。在这种情况下,我会抛出一个异常,这样消息就不会被确认。但目前我看不到消息是如何重新传递的。
我建立了一个小的虚拟项目来测试这个:
- 夸库斯
- ActiveMQ 阿尔忒弥斯
- 将消息(通过 Artemis 控制台)发送到队列
中——队列配置为 max redelivery = 3 - 使用 Quarkus / Smallrye Reactive Messaging @Incoming 注解接收消息
- @Incoming 方法抛出异常
--> 消息从 Artemis 队列中移除
--> @Incoming 方法只被调用一次
如果我关闭 Quarkus 应用程序,可以在 Artemis 队列中再次看到该消息,并将重新传递标志设置为 true。
但是我找不到如何在 Smallrye 反应式消息传递中管理/配置重新传递,以便该层处理消息的重新传递 n 次,并在最大重试次数后将消息放入 DLQ。
有没有办法做到这一点?
quarkus - 在 quarkus 中发送带有标头的 amqp 消息
我想使用 Quarkus 在 AMQP 消息中将 Java(以及 Kotlin)POJO 作为 JSON 发送到 RabbitMQ。
在兔子队列上,消息作为 base64 编码字节流接收。
如何在此处设置标题以将内容类型放入其中?像 TTL 这样的标头设置也可能很有趣。
rest - 使用 io.smallrye.mutiny.Uni 创建响应对象
我正在尝试在 Quarkus 框架上学习使用 ReactiveMongoClient。
我以 Uni> 的身份发送回复部分成功
但是,当我尝试让某个其他类(StaffResponse)的对象包含一个用于分页的 Link 对象时,我没有得到任何 Staff 记录。(现在我已经硬编码了分页链接)
响应中的“员工”为空。
MongoClient 正在返回 Staff 列表,看起来 Response 对象没有获取列表。尝试阅读 SmallRye Mutiny 文档 - 无法解决。
请帮忙。
我已在以下位置提交代码:https ://github.com/doepradhan/staffApi 和一个示例 json 数据文件(https://github.com/doepradhan/staffApi/blob/master/sample-staff-data.json)
谢谢你的帮助。
java - Quarkus 消息消费者并发(多线程)
在 JMS 世界中,您可以通过调整消息驱动 bean (MDB) 中的maxSession属性轻松定义并发度。现在 Quarkus 显然没有 MDB,但是如何实现相同级别的并发调优呢?
我查看了指南(Quarkus - Using JMS),但示例中的所有消费者所做的都是使用无限循环获取消息。我认为这是一个非常糟糕的例子。我了解语法,但它使用消息的方式对我来说似乎很忙。
所以我想也许我没有抓住重点,而响应式消息传递是更有效地处理更多消息的方法。我查看了Quarkus 指南 - Using AMPQ with Reactive messages但是它再次没有说明消费者的并发性。在最后一个示例中,Price Generator bean 是一个应用程序范围的 bean,它使用响应式消息注释从 Artemis 代理消费消息,这很酷,但是并发呢?
如果我希望传入通道(或 JMS 队列)的消耗速度比其他通道快,那么如何在 Quarkus 上实现呢?你能分享一个例子吗?
quarkus - 重新启动响应式消息传递,例如在重新配置之后
如何重新启动或停止/恢复响应式消息传递,例如在更改间隔时间之后?此示例来自 Quarkus 指南:https ://quarkus.io/guides/kafka-streams
java - 单如何在 REST 调用中获得失败或成功响应
在服务器发送电子邮件后,我正在尝试设置简单的成功/失败响应。
但是,即使经过数小时的尝试多种变体,我仍然没有得到正确的响应。
只是给出一个接受的响应的示例代码在这里:
但是,当邮件没有成功发送时,我想在这里提供另一个回复。我试过的是这个(但这是一个没有成功的 Uni 演员):
控制台输出(由于密码错误没有发送邮件时):
还有另一个选择:
控制台日志(带有 println)。它在我收到接受的客户端输出后执行。
客户端输出(由于密码错误没有发送邮件时):
但是机器人没有成功。我只想接收邮件是否已发送或发送时是否有任何错误。有人对如何进行有任何想法提示吗?
java - Quarkus 从 Kafka 主题中提取并将 JSON 有效负载发送到 REST 端点
因此,我将 Quarkus 与 Microprofile Reactive Messaging 框架(带有 SmallRye Kafka 连接器)和 RxJava2 Flowable 流对象一起用于响应式消息接收/发送。我有一个微服务,它使用 @Incoming 和 @Outgoing 注释来正确使用通道从后面的主题中提取并将消息推送到主题。
但是,现在我想修改它,以便我仍然可以从 Kafka 主题中提取数据,然后将 JSON 有效负载发送到 REST 端点。据我所知,没有与 Quarkus HTTP 兼容的 SmallRye 连接器。有没有人碰巧知道任何方法可以让它工作?
示例函数
apache-kafka - Quarkus 应用程序中 Kafka 反序列化器中的 CDI 上下文
我有一个基于 Kafka 的带有 Smallrye 反应式消息传递的 Quarkus 项目。因为我想使用“复杂的 pojo”,所以我需要一个自定义的反序列化器。
我想制作这两个类 CDI bean,这样我就可以注入和使用我的自定义记录器,它是一个 CDI bean。有没有办法做到这一点?
现在我注入的记录器对象只是空的: