问题标签 [smallrye-reactive-messaging]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
485 浏览

amqp - ActiveMQ Artemis:从 smallrye-reactive-messaging (AMQP) 以编程方式创建队列

我的 Quarkus 微服务使用 smallrye 反应式消息传递库中的 AMQP 连接器向从vromero/activemq-artemis:2.16.0-alpineDocker 映像运行的 ActiveMQ Artemis 代理生成消息。反应式消息传递库文档提到了使用动态地址名称的可能性。我在我的 REST 资源中使用以下 (Kotlin) 代码:

连接器定义在application.properties

ActiveMQ Artemis 确实my-custom-address动态地创建了地址,但是它没有创建任何绑定到它的队列,并且消息最终被取消路由

配置broker.xml文件包含在core部分

我尝试将队列名称与地址一起传递

但这没有任何区别。

以编程方式创建的队列和传递给它的消息缺少什么?另外,为什么 Artemis 在消息丢失(未路由)时会确认消息?

更新:附上来自 Artemis 网络界面的屏幕截图 在此处输入图像描述

0 投票
1 回答
668 浏览

quarkus - 如何处理错误 - SRMSG00034:发出项目的下游请求不足

我正在对 Quarkus 开发的一个简单应用程序进行负载测试。应用程序将 http 请求代理到另一个 http 服务。该应用程序使用org.eclipse.microprofile.reactive.messaging.Emitter和 o rg.eclipse.microprofile.reactive.messaging.Channel

如果我将请求速率推到 300 req/sec,我会遇到错误。我试图了解错误SRMSG00034:发出项目的下游请求不足以及如何解决它。任何帮助,将不胜感激。

0 投票
0 回答
28 浏览

cdi - 如何刷新并发送请求以使用微配置响应式消息获取数据

我正在尝试制作一个 webapp,并且我想在刷新 web 时发送一条消息(这是我制作的一个简单示例,因为它更容易理解)。我尝试过使用内部渠道,但我也尝试过使用 Kafka,但也没有用。

失败消息是“未找到频道“通知刷新”的订阅者”。当我用 kafka 尝试它时没有错误,但它没有检索到消息。

我的资源:它返回发布者是因为主应用程序具有的功能而不是这个可重现的示例,但这不是问题。

然后这个类接受消息,进行一些更改并发送到处理器:

返回资源的处理器:

0 投票
1 回答
67 浏览

quarkus - 在 quarkus 中为 AWS Glue Schema Registry 配置反应式消息传递

将 AWS Glue 架构注册表与 quarkus 反应式消息集成时出现问题。我有一个属性定义为:

注意 schemaName 中的驼峰式。Glue 模式注册表正在寻找 schemaName 的值,但从日志输出中,quarkus 似乎将该属性以全小写形式作为 schemaname 输出,因此添加其他 kafka 属性的默认方法不起作用。

有没有办法在属性文件中维护骆驼外壳,或者有其他方法可以将 kafka 属性添加到应用程序中。

谢谢

0 投票
1 回答
196 浏览

java - 如何从 quarkus 应用程序中将墓碑消息正确发布到压缩的 kafka 主题?

Quarkus应用程序中,我需要将 tombstone 消息发布到压缩的 Apache Kafka 主题。由于我的用例是必要的,因此我使用 anEmitter来向主题发送消息(如 quarkus 博客中所建议的那样)。非墓碑消息(带有效负载)的代码是:

我希望这些工具可以让我制作一个带有Emitter有效载荷的墓碑消息。不幸的是,工厂方法的所有实现,允许提供元数据(需要提供消息密钥),指定有效负载,不能是 {@code null}<M extends Message<? extends T>> void send(M msg)MessagenullMessage.of(..)

使用Emitter

0 投票
1 回答
124 浏览

java - Quarkus 有什么方法可以控制来自 Kafka 的 @incomming 消息

我是 quarkus 的新手,我想听一个 kafka 主题,但仅限于某些环境。

我会用这个话题做一些mock,生产环境不想听。有没有办法不基于变量运行@Incoming?

感谢帮助。

0 投票
2 回答
457 浏览

java - Quarkus/SmallRye Reactive Messaging/Kafka 连接多个集群

目前我有一个 Quarkus 应用程序,它从一个 Kafka 主题消费并在另一个 Kafka 主题上产生。它使用 SmallRye 反应式消息传递。它运作良好。由于外部变化,要生成的主题和要使用的主题将位于不同集群上的 Kafka 服务器上(不应/不能组合在一个集群中)。

在应用程序配置(yaml)中,我们设置了 Kafka 服务器(代理):

在此处添加服务器无济于事,然后它会尝试将数据传播到代理上,这不是我的意图。

是否可以连接到多个集群(可能为每个主题设置一个服务器)?在互联网上找不到任何关于Quarkus 文档SmallRye 文档的内容

0 投票
1 回答
275 浏览

apache-kafka - Quarkus Kafka Partitions 配置似乎只处理其中一个分区

我正在尝试增加消费者的数量以匹配我们正在读取的 Kafka 主题的分区数量。有三个分区,所以我将传入消息的分区配置为三个,如下所示:

但是,我已经运行该应用程序一段时间了,似乎该应用程序只处理分区 0 中的消息,而不处理分区 1 和 2 中的消息。我在日志中看到它创建了三个消费者。

但它似乎在处理分区 0 中的消息:

下面是监听器类的代码片段:

这是小黑麦卡夫卡的错误吗?

0 投票
0 回答
70 浏览

apache-kafka - 如何使用带有 Smallrye 反应式消息(发射器)的 Kafka 向各种订阅者广播

我正在开发一个应用程序,它使用发射器向另一个使用 Kafka 连接器的应用程序中的各种订阅者发送消息。但我的问题是消息只到达一个订阅者而不是所有订阅者。我已经用 mp.messaging.outgoing.channel.broadcast=true 配置了我的频道,并且我的发射器也用“@Broadcast”注释。所以它应该到达他们所有的订户。我不知道我是否需要配置其他东西。

谢谢。

0 投票
0 回答
285 浏览

kafka-producer-api - 在 quarkus 应用程序中未找到异常 smallrye-kafka 生产者

有一个基于 quarkus 的简单 poc 向 kafka 主题发送 avro 消息。

以下是已加载的配置 -

acks = 1
batch.size = 16384
bootstrap.servers = [kaas-test-dc-a.company.com:443]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = kafka-producer-cpe-reply
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org. apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
元数据.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer .bytes = 32768
reconnect.backoff.max.ms = 10000
reconnect.backoff.ms = 50
request.timeout.ms = 30000次
重试 = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl。 jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min。 period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
socket .connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification .algorithm = https
ssl.engine.factory.class = null
ssl.key.password = [隐藏]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = C:/keystore.jks
ssl.keystore.password = [隐藏]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = C:/truststore.jks
ssl.truststore.password = [隐藏]
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class io.apicurio.registry.utils.serde。 AvroKafkaSerializer

以下是我看到的错误 -
2021-06-29 23:36:59,012 INFO [org.apa.kaf.com.uti.AppInfoParser](Quarkus 主线程)Kafka 版本:2.7.0 2021-06-29 23:36:59,012 INFO [org.apa .kaf.com.uti.AppInfoParser](Quarkus 主线程)Kafka commitId:448719dc99a19793 2021-06-29 23:36:59,015 INFO [org.apa.kaf.com.uti.AppInfoParser](Quarkus 主线程)Kafka startTimeMs: 1625024218998 2021-06-29 23:36:59,757 INFO [io.quarkus](Quarkus 主线程)aru-unet-Service-replyws 1.0.0 on JVM(由 Quarkus 1.13.7.Final 提供支持)开始于 33.944 秒。收听:http://localhost:8080 2021-06-29 23:36:59,758 INFO [io.quarkus](Quarkus 主线程)配置文件开发已激活。激活实时编码。2021-06-29 23:36:59,761 INFO [io.quarkus](Quarkus 主线程)已安装的功能:[camel-support-retrofit、cdi、kotlin、mutiny、quarkiverse-apicurio-registry-client、resteasy、resteasy-jackson ,

希望对上述异常有所帮助。