问题标签 [reactive-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
315 浏览

scala - 如何检查 Kafka 是否与 ReactiveKafka 连接?

我正在编写一个在 Scala 中使用 reactive-kafka 模块的代码。我的代码使用以下行:

我有两个问题:

  1. 我看到我的代码导入了包 com.softwaremill.react.kafka。你能给我一个详细文档的链接吗?到目前为止我发现的所有信息都太简洁了。

  2. 我想检查服务器是否仍然连接到 Kafka。如何使用 com.softwaremill.react.kafka 包来做到这一点?

0 投票
1 回答
279 浏览

scala - 如何使用发布者(在响应式 kafka 中)生成消息?

我必须使用来自 reactive-kafka 模块的代码:https ://github.com/akka/reactive-kafka/blob/master/README.md

我的代码以:

我想产生一条消息(由发布者)。我必须编写什么代码才能实现它?

0 投票
1 回答
94 浏览

apache-kafka - 反应式卡夫卡:为什么发布者“与”消费者“同行”?

我查看了以下文档:https ://github.com/akka/reactive-kafka ,我看到了以下代码片段:

我知道“发布者”应该向 Kafka 写消息。然而,Kafka 中的 Consumer 的含义恰恰相反,即消费者从 Kafka 中读取消息。如果是这样,'publisher' 与 kafka.consume(ConsumerProperties...) 有什么关系?

0 投票
1 回答
1613 浏览

scala - 如何将 Bytestring 转换为 Source[Bytestring, Any]

我从 Kafka Reactive Streams 消费者那里收到一个文件作为字节串;我想用这个 Bytestring 作为实体 HttpEntity.Default 构造一个 akka-http 请求。HttpEntity.Default 需要 Source[Bytestring, Any] 作为其参数之一。

将两者联系起来的最佳方式是什么?

0 投票
0 回答
208 浏览

scala - 通过源 [Bytestring] 使用 Akka-http 链 Akka 流式传输 Kafka

我从 Kafka Reactive Streams 消费者那里收到一个文件作为字节字符串,我想将它发送到服务。

是否可以从 Kafka Reactive Stream Consumer 中提取 Source[Bytestring, Any],这样我就可以将流从 Kafka 链接到 Akka-http,而无需在内存中加载整个 Bytestring 然后执行 akka-http 请求?

0 投票
1 回答
1434 浏览

scala - 如何使用 Akka-Stream 在 Reactive Kafka 中“分块和重新组合”大消息

使用 Kafka 发送大文件时,是否可以跨分区分发,然后使用 Akka-Stream 重新组装?如本演示文稿所述:

http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297

0 投票
1 回答
395 浏览

scala - Kafka - 处理无法处理消息的消费者的模式

我正在尝试实现简单的服务,从 kafka 中提取消息,将它们包装在一些数据中并发送到外部服务。

处理消息时处理外部服务不可用的常见模式是什么?

到目前为止,我仅在对外部服务的请求成功时才手动提交消息。如果消息未提交,我希望 kafka 在一段时间后重新发送消息,以便处理外部服务失败对消费者来说是透明的。我找不到办法做到这一点。但是,如果我不做一些反模式并且有更好的解决方案,我很好奇。

0 投票
1 回答
1410 浏览

scala - Reactive-Kafka Stream Consumer:发生死信

我正在尝试使用 akka 的反应式 kafka 库来使用来自 Kafka 的消息。我正在打印一条消息,然后我得到了

这是我正在执行的代码

0 投票
2 回答
1457 浏览

scala - Reactive-Kafka:如何在异常时暂停消费者并按需重试

我已经在 Google Groups 上问过这个问题,但还没有收到任何回复。所以在这里为不同的观众发布这个。

我们正在为我们的应用程序使用 Reactive-Kafka。我们有一个如下场景,如果在处理消息时发生任何异常,我们希望停止向消费者发送消息。该消息应在规定的时间后或在消费者方的明确要求下重试。假设使用我们目前的方法,如果消费者的数据库关闭了一段时间,它仍然会尝试从 kafka 读取并处理消息,但由于数据库问题而导致处理失败。这将使应用程序不必要地忙碌。取而代之的是,我们希望暂停消费者以在规定的时间内接收消息(例如,等待 30 分钟重试)。我们不确定如何处理这种情况。

是否可以这样做?我错过了什么吗?

以下是从响应式 kafka 中获取的示例代码:

0 投票
1 回答
86 浏览

akka - 在反应式 kafka 中将新创建的主题添加到订阅中

我在我们的 play scala 项目中开发响应式 kafka,在项目中我们创建了 5 个由消费者组订阅并且运行良好的主题,现在的问题是我创建了一个新主题,我如何将此主题添加到现有的消费者组(是有可能)我的代码是:

有什么方法可以向消费者添加主题