问题标签 [reactive-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 如何检查 Kafka 是否与 ReactiveKafka 连接?
我正在编写一个在 Scala 中使用 reactive-kafka 模块的代码。我的代码使用以下行:
我有两个问题:
我看到我的代码导入了包 com.softwaremill.react.kafka。你能给我一个详细文档的链接吗?到目前为止我发现的所有信息都太简洁了。
我想检查服务器是否仍然连接到 Kafka。如何使用 com.softwaremill.react.kafka 包来做到这一点?
scala - 如何使用发布者(在响应式 kafka 中)生成消息?
我必须使用来自 reactive-kafka 模块的代码:https ://github.com/akka/reactive-kafka/blob/master/README.md
我的代码以:
我想产生一条消息(由发布者)。我必须编写什么代码才能实现它?
apache-kafka - 反应式卡夫卡:为什么发布者“与”消费者“同行”?
我查看了以下文档:https ://github.com/akka/reactive-kafka ,我看到了以下代码片段:
我知道“发布者”应该向 Kafka 写消息。然而,Kafka 中的 Consumer 的含义恰恰相反,即消费者从 Kafka 中读取消息。如果是这样,'publisher' 与 kafka.consume(ConsumerProperties...) 有什么关系?
scala - 如何将 Bytestring 转换为 Source[Bytestring, Any]
我从 Kafka Reactive Streams 消费者那里收到一个文件作为字节串;我想用这个 Bytestring 作为实体 HttpEntity.Default 构造一个 akka-http 请求。HttpEntity.Default 需要 Source[Bytestring, Any] 作为其参数之一。
将两者联系起来的最佳方式是什么?
scala - 通过源 [Bytestring] 使用 Akka-http 链 Akka 流式传输 Kafka
我从 Kafka Reactive Streams 消费者那里收到一个文件作为字节字符串,我想将它发送到服务。
是否可以从 Kafka Reactive Stream Consumer 中提取 Source[Bytestring, Any],这样我就可以将流从 Kafka 链接到 Akka-http,而无需在内存中加载整个 Bytestring 然后执行 akka-http 请求?
scala - 如何使用 Akka-Stream 在 Reactive Kafka 中“分块和重新组合”大消息
使用 Kafka 发送大文件时,是否可以跨分区分发,然后使用 Akka-Stream 重新组装?如本演示文稿所述:
http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297
scala - Kafka - 处理无法处理消息的消费者的模式
我正在尝试实现简单的服务,从 kafka 中提取消息,将它们包装在一些数据中并发送到外部服务。
处理消息时处理外部服务不可用的常见模式是什么?
到目前为止,我仅在对外部服务的请求成功时才手动提交消息。如果消息未提交,我希望 kafka 在一段时间后重新发送消息,以便处理外部服务失败对消费者来说是透明的。我找不到办法做到这一点。但是,如果我不做一些反模式并且有更好的解决方案,我很好奇。
scala - Reactive-Kafka Stream Consumer:发生死信
我正在尝试使用 akka 的反应式 kafka 库来使用来自 Kafka 的消息。我正在打印一条消息,然后我得到了
这是我正在执行的代码
scala - Reactive-Kafka:如何在异常时暂停消费者并按需重试
我已经在 Google Groups 上问过这个问题,但还没有收到任何回复。所以在这里为不同的观众发布这个。
我们正在为我们的应用程序使用 Reactive-Kafka。我们有一个如下场景,如果在处理消息时发生任何异常,我们希望停止向消费者发送消息。该消息应在规定的时间后或在消费者方的明确要求下重试。假设使用我们目前的方法,如果消费者的数据库关闭了一段时间,它仍然会尝试从 kafka 读取并处理消息,但由于数据库问题而导致处理失败。这将使应用程序不必要地忙碌。取而代之的是,我们希望暂停消费者以在规定的时间内接收消息(例如,等待 30 分钟重试)。我们不确定如何处理这种情况。
是否可以这样做?我错过了什么吗?
以下是从响应式 kafka 中获取的示例代码:
akka - 在反应式 kafka 中将新创建的主题添加到订阅中
我在我们的 play scala 项目中开发响应式 kafka,在项目中我们创建了 5 个由消费者组订阅并且运行良好的主题,现在的问题是我创建了一个新主题,我如何将此主题添加到现有的消费者组(是有可能)我的代码是:
有什么方法可以向消费者添加主题