问题标签 [akka-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
416 浏览

scala - 每秒可调整消息量的 Kafka 生产者

编写具有稳定但可调节输出的 Apache Kafka 生产者的最佳方法是什么。

示例:生产者应该向代理发送恒定的 1000 条消息/秒。在运行期间,输出应可调整为 10 或 10000 条消息/秒。

一种方法是设置一个调度程序,它每秒运行一次并批量发送预定义数量的消息。

另外:由于这个生产者应该是性能测试框架的一部分,所以需要发送的消息量非常高。有人将如何处理非常高的负载?使用 Akka 会有好处吗?

目标语言是 Scala,但任何语言的示例代码都非常受欢迎。

0 投票
0 回答
353 浏览

akka-stream - Akka Kafka Producer 中的错误处理

我正在使用 reactive-kafka-core 0.10.1(针对 Kafka 0.9.x)。每当回调函数遇到错误时,似乎 Kafka 生产者 Actor 就会停止。有没有办法自定义这种行为?我们的用例是尝试恢复并重新发送消息。

}

0 投票
1 回答
1882 浏览

scala - NoSuchMethodError:KafkaConsumer.subscribe

我使用以下依赖项:

还有演员:

然后我尝试启动上面的actor并引发以下异常:

我发现了以下问题,但它没有帮助。如何解决此 API 冲突?

0 投票
0 回答
208 浏览

scala - 通过源 [Bytestring] 使用 Akka-http 链 Akka 流式传输 Kafka

我从 Kafka Reactive Streams 消费者那里收到一个文件作为字节字符串,我想将它发送到服务。

是否可以从 Kafka Reactive Stream Consumer 中提取 Source[Bytestring, Any],这样我就可以将流从 Kafka 链接到 Akka-http,而无需在内存中加载整个 Bytestring 然后执行 akka-http 请求?

0 投票
3 回答
696 浏览

scala - Akka Stream TCP + Akka Stream Kafka 生产者不停止不发布消息并且不出错

我有以下流:

它可以正常工作一段时间,我可以使用填充在 Kafka 主题上的消息。但有时,显然是在一个随机的时间间隔,没有更多的消息发布,并且这段代码没有记录任何错误(printAndByeBye 将打印传递的消息并终止actor系统。)重新启动应用程序后,消息继续流动。

关于如何知道这里发生了什么的任何想法?

编辑:我把 Kamon 放在上面,我可以看到以下行为:

每个 Actor 的邮箱大小

每个 Actor 在邮箱中的时间

每个 Actor 的处理时间

看起来有些东西在没有通知流应该停止的情况下停止了,但我不知道如何使其明确并停止流。

0 投票
1 回答
1410 浏览

scala - Reactive-Kafka Stream Consumer:发生死信

我正在尝试使用 akka 的反应式 kafka 库来使用来自 Kafka 的消息。我正在打印一条消息,然后我得到了

这是我正在执行的代码

0 投票
2 回答
1478 浏览

akka-stream - 处理消息后提交 Kafka 消费者偏移量的好模式是什么?

我正在使用Akka Streams Kafka将 Kafka 消息通过管道传输到远程服务。我想保证该服务只接收一次每条消息(至少一次和最多一次传递)。

这是我想出的代码:

如代码所示,它映射原始消息的元组,以及传递给订阅者(发送到远程服务的参与者)的转换消息。元组的目的是在订阅者完成处理后提交偏移量。

关于它的某些东西似乎是一种反模式,但我不确定是否有更好的方法来做到这一点。有什么更好的方法建议吗?

谢谢!

0 投票
1 回答
397 浏览

scala - 在从 Kafka 主题中消费完所有可用消息后,如何返回包含消息列表的未来?

我可能错过了卡夫卡消费者的观点,但我想做的是:

消费者订阅一个主题,获取该主题内的所有消息并返回一个包含所有这些消息列表的 Future

我为尝试完成此操作而编写的代码是

不过,Future 永远不会返回,它会消耗必要的消息,然后继续重复轮询主题。有没有办法返回 Future 然后关闭消费者?

0 投票
2 回答
654 浏览

scala - Akka Kafka Producersettings:重载方法值适用于替代方案:

当我将生产者设置放入我的代码中时,我一次又一次地遇到问题。当我没有它时,一切正常。下面给出了包含所有代码的文件单个文件,我正在尝试将文件写入 kafka 流。并得到这个编译错误。

错误是

0 投票
1 回答
504 浏览

akka - Akka Kafka 流监督策略不起作用

我正在运行一个 Akka Streams Kafka 应用程序,我想在流消费者上合并监督策略,这样如果代理出现故障,并且流消费者在停止超时后死亡,主管可以重新启动消费者。

这是我的完整代码:

UserEventStream

StreamProcessorSupervisor(这是班级的主管UserEventStream班):

App(主要应用类):

application.conf

运行应用程序后,我故意杀死了 Kafka 代理,然后发现 30 秒后,演员正在通过发送毒丸来停止自己。但奇怪的是,它并没有像BackoffSupervisor策略中提到的那样重新启动。

这里可能是什么问题?