问题标签 [micronaut-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
442 浏览

java - 使用 apache kafka 的反应式和非阻塞方法 Micronaut

我正在尝试从 Micronaut kafka 实现中获取非阻塞响应,但是返回值不起作用。

subscribe 方法不起作用,调试器永远不会到达这一点。我想从kafka监听器那里取回价值

卡夫卡制作人

卡夫卡监听器

非阻塞方法的 Micronaut 文档

https://docs.micronaut.io/1.0.0.M3/guide/index.html#_reactive_and_non_blocking_method_definitions

0 投票
0 回答
346 浏览

java - Rest api 设计与 apache kafka java 和 micronaut 应用程序

在此处输入图像描述

在使用 Micronaut 框架处理 Rest API CRUD 操作时,我有上图。我有单向流,控制器需要知道从 Kafka 消费者 API 执行的操作。

例如

  1. 要从数据库中获取所有项目列表,它是在消费者级别执行的
  2. 消费者级别的添加/更新/删除操作

我在消费者级别(侦听器)有以下 Micronaut 反应性代码,以从 mongo DB 获取产品列表

生产者界面

生产者的实现

来自 Micronaut 文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerMethods

根据 Micronaut 文档接收和返回反应类型

在我的生产者实现中,我在订阅时从未收到任何值。

我想知道上述流程是否是使用 kafka 执行 REST API CRUD 操作的正确方法?

0 投票
1 回答
145 浏览

java - 使用 micronaut 2.1.3 接收和返回 Reactive Types apache kafka

我正在使用 Micronaut 应用程序通过 apache Kafka 接收和返回反应类型。来自 Micronaut 文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/

有一种接收和返回反应类型的方法,如下所示

此方法在消费者(侦听器端)

有没有使用这个方法的例子,返回值在Producer端会如何处理?

0 投票
0 回答
1079 浏览

apache-kafka - 使用多个 KStream/KTable 实现 Micronaut/Kafka-Streams 的最佳实践?

关于示例 Micronaut/Kafka Streams 应用程序有几个我不明白的细节。这是文档中的示例类(原始链接:https ://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStreams )。

我的问题是:

  • 为什么我们只返回源流?
  • 如果我们有多个源KStream对象,EG 做一个连接,我们是否也需要让它们成为 Bean?
  • 我们是否还需要将每个源都KTable设为 Bean?
  • 如果我们不创建源KStreamKTableBean 会发生什么?我们目前至少有一个项目可以做到这一点,但没有明显的问题。

编辑:这是我们服务的一个版本,具有多个流,经过编辑以删除识别信息。

0 投票
1 回答
295 浏览

java - 多次调用 Kafka Streams StreamBuilder.build() 可以吗

我们正在使用 micronaut/kafka-streams。使用此框架来创建流应用程序,您可以构建如下内容:

这:

  • 收到一个ConfiguredStreamBuilder(非常轻的包装纸StreamsBuilder
  • 构建并返回流(我们实际上不确定返回流的重要性,但这是一个不同的问题)。

ConfiguredStreamBuilder::build()(在 上调用相同的StreamsBuilder方法)稍后由框架调用,并且返回Topology的内容不可用于 Micronaut 的注入。

我们需要Topologybean 来记录拓扑的描述(通过Topology::describe)。

执行以下操作是否安全?

  • 调用ConfiguredStreamBuilder::build(因此StreamsBuilder::build)并使用返回的实例Topology来打印人类可读的描述。
  • 允许框架ConfiguredStreamBuilder::build稍后再次调用,并使用返回的拓扑的第二个实例来构建应用程序。
0 投票
0 回答
499 浏览

apache-kafka - Proto + Kafka Producer:将对象序列化为 JSON 时出错:直接自引用导致循环(通过引用链 unknownFields

堆栈:Micronaut-Kafka、Proto3、gRPC、BloomRPC

目标:接收来自 BloomRPC 的 grpc 请求并将其发布到 kafka 主题

上下文:我编写了一个 gRPC 端点,它成功接收了一个简单的调用,但是在尝试将此类对象发布到 Kafka 主题时,我得到了

我想我的问题与“......将对象序列化为 JSON 时出错”有关。我真的必须将这样的对象转换为 json 吗?我可以不把它发布到kafka主题吗?我在这里缺少什么?好吧,我可以通过创建一个新对象并根据请求对象设置每个属性来轻松解决这个问题,如下所示:

端点(控制器)方法

仅为转换创建的模型(这个解决方案似乎并不奇怪???)

所以,我的直截了当的问题是:我可以重用从 proto 文件产生的相同自动生成的存根作为模型来接收请求并将其发布到 kafka 主题吗?如果是这样,下面的代码有什么问题?如果没有,推荐的方法是什么?

完整代码:

事务.proto

事务端点(控制器)

交易服务(这里不是很重要)

卡夫卡制作人

构建.gradle

如果相关,这里是 proto 自动生成的存根的一部分

*** 第一版

*** 第二版

我尝试通过 application.yaml 添加序列化程序

gradle.build

现在我在 gradle build 期间得到

0 投票
1 回答
781 浏览

apache-kafka - 带有 auto.commit.enable=false 的 Micronaut-Kafka:如何手动提交偏移量

只有当它成功保存在数据库中时,我才想提交一条消息。

我知道我已经关闭了这个 application.yml 的自动提交

消费者

换句话说,在使用 Micronaut-Kafka 时,我如何使用 commitOffset 或 commitSync() 或任何其他替代方法手动提交消息?

*** 第二版

我回到了 application.yaml

*** 第三次编辑

我尝试添加 io.micronaut.configuration.kafka.Acknowledgement (已弃用)或 import io.micronaut.messaging.Acknowledgement 并且其中一个导致

看来我必须做些别的事情才能让 Micronaut 注入这样的 Acknowledgement 对象。我在下面缺少什么?

整个日志是

0 投票
1 回答
227 浏览

apache-kafka - Micronaut Kafka 阻塞

在启动应用程序后第一次调用在 Kafka 上发送消息后阻止创建一些 beanAbstractKafkaProducerConfiguration

任何想法如何处理。应用程序永远阻塞。

问候奥利弗

0 投票
1 回答
1214 浏览

apache-kafka - 从 Kafka Consumer 内部,如何获取消息头

简而言之,我想让消费者收听消息头,但我得到“未指定必需参数 [Header header]”。

这是我尝试失败的方式:

它打印此错误:

如果它添加了一些东西,这里是生产者:

发送带有标头抛出生产者的消息的服务

我期待打印... My-Header: myHeader

*** 编辑

对于未来的读者,我的解决方案变成:

应用程序.yml

制片人

消费者

0 投票
0 回答
181 浏览

callback - Kafka Producer:如何使用 Reactive 和 Non-Blocking Producer 编写 onSuccess/onError 回调代码

我正在尝试遵循Micronaut Kafka 指南。它显示了这段代码:

我尝试以这种方式实施但没有成功

制片人

并从服务层调用它

基本上,我想要的是使用 io.reactivex.Single 向 kafka“无阻塞”样式发布消息。我知道我必须订阅然后编写两个回调:onSuccess 和 onError。当然,我缺少一些关于 ReactiveX 的基本概念。请注意,任何线索将不胜感激。