问题标签 [reactor-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
64 浏览

spring - 在 Reactor Kafka 和 Spring 集成中没有创建订阅

我正在尝试使用 Spring Integration 和 Project Reactor 创建一个简单的流程,在其中我使用 Reactor Kafka 使用记录,将它们传递到一个通道,从那里它将使用 Reactor Kafka 将消息生成到另一个主题。

消费流程为:

生产流程为:

我想知道我应该如何以及在哪里执行订阅。

编辑:

我添加了一个.subscripe(),它仍然不起作用:

示例代码:

0 投票
0 回答
33 浏览

apache-kafka - Reactive Kafka Receiver 和并行记录处理流程

我正在尝试使用 reactor-kafka 处理来自 Kafka 的批量记录,如下所示:

使用 max.poll.records = 5。它有效,我可以在日志中看到:

即每批处理完后执行poll

出现问题时,当我尝试并行处理记录时,将publishOn添加到上面的代码中:

在这种情况下,看起来 poll 是独立执行的:

如您所见,在处理批处理之前轮询记录。理想的行为是仅在当前批次完全处理后轮询 Kafka 以获取下一个批次,但同时处理批次记录。我怎样才能做到这一点?

感谢任何帮助或提示

UPD。 经过一番激烈的斗争,我想我找到了解决办法:

这完美地工作但是现在有下一个问题 - 如果我将receiveExactlyOnce转换为热发布者(即添加.publish(1)),会发生类似的错误,onNext 和批处理开始独立工作

更新 2如果您使用 prefetch = 1,则publish没有问题。但是您确实会得到一些“轮询转发”行为(我认为这是术语“预取”所假设的),但是轮询率实际上是由批处理速度控制的。

问题解决了。

0 投票
1 回答
116 浏览

spring-boot - 广播到使用 springboot webflux 创建的多个反应式 WebSocketSession 不起作用

以下是场景:

  1. 我创建了一个反应堆 kafka 接收器
  2. 从 kafka 接收器消耗的数据被发布到 WebSocketHanlder
  3. WebSocketHanlder 使用 SimpleUrlHandlerMapping 映射到 URL
  4. URL 模式是 api/v1/ws/{ID},我希望根据 URI 中使用的不同 ID 创建多个 WebSocketSession,这些 ID 由单个 WebSocketHanlder 管理,这实际上正在发生
  5. 但是当来自 kafka 接收器的数据发布时,只有第一个创建的 WebSocketSession 接收到它,所有其他 WebSocketSession 都没有接收到数据
  6. 我正在使用带有 starter-tomcat 的 spring-boot 2.6.3

如何将数据发布到创建的所有 WebSocketSessions 我的代码:

Web 套接字处理程序的配置

WebSocket 处理程序的代码

卡夫卡接收器

POM 依赖项

我还尝试将 WebSocketHanlder 中的 handle(...) 方法定义更改为以下,但仍然来自 kafka 的数据仅推送到一个 websocket 会话:

另外,我尝试了以下操作:

我启用了 log() 并得到以下输出:

在这里,我们可以看到我们有 2 个 reactor-kafka Flux 订阅者:

  1. reactor.Flux.Map.42 - onSubscribe(FluxMap.MapSubscriber
  2. reactor.Flux.Map.46 - onSubscribe(FluxMap.MapSubscriber)

但是当从 kafka 主题读取数据时,它只被一个订阅者接收:

  • reactor.Flux.Map.42 - onNext({"greeting" : "Hello", "name" : "Prashant"})

它是 Webflux API 本身的错误吗?