问题标签 [reactive-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1685 浏览

scala - Akka Streams Kafka - 消费者单元测试

我试图通过设置 Kafka 服务器并使用生产者发送消息来在本地测试我的代码,但我想知道是否有一种方法可以为这段代码编写单元测试(测试消息是否由消费者是正确的)。

0 投票
1 回答
209 浏览

scala - 在 Akka Streams Kafka 中构造 ProducerMessage 时如何配置偏移参数?

我正在使用 Scala 2.11 和 Akka Streams Kafka 0.17。

我有一个,其中:

  • ASource是使用Source.actorRef. 在这里,actor 被安排在某个固定的时间间隔运行并连续生成消息,这些消息被发送到流中。
  • 我已将 aProducer作为Flow. 生产者推ProducerMessage.Message送到 Kafka 主题。
  • 一些数据库操作。

我在构建 时遇到问题ProducerMessage.Message,如下所示:

我可以轻松地传递record包含实际消息的参数。但我不知道要在passThrough参数中传递什么。根据文档

passThrough字段可以保存通过Consumer#flow并包含在 中的任何元素Result。当需要在下游操作上传递一些上下文时,这很有用。这可以通过解压缩/压缩来完成,但这更方便。例如,它可以是一个ConsumerMessage.CommittableOffsetConsumerMessage.CommittableOffsetBatch可以稍后在流程中提交。

就我而言,没有任何 Kafka 消费者订阅 Kafka 主题并为我的流生成Source(comittableSourceplainSource)。在这种情况下,我会按照文档中的说明传递消费者偏移量。但就我而言,演员正在模拟这样的消费者。这意味着我无权访问ConsumerMessage.CommittableOffset. 那么我在passThrough这里为参数传递什么?在这种情况下,最佳做法是什么?

0 投票
0 回答
1360 浏览

apache-kafka - 在kafka中需要<5ms的可预测恒定延迟

我们计划使用 3 个 kafka-brokers 和一个 zookeeper 节点。我们为每个主题保留了一个分区。但是我们对 5ms 以下的延迟有严格的要求。我们正在使用 reactive-kafka 客户端库https://github.com/akka/reactive-kafka

例如,我们有一个生产者每 30 毫秒产生 128 字节(大约)数据,消费者消费者每 50 毫秒产生 128 字节(大约)数据。

我们尝试过

在生产者方面。

以 50 毫秒在消费者端轮询

但是一旦我们将生产频率提高到 1 秒,延迟就开始变得不可预测,从 1 毫秒到 70 毫秒。

那么,如果我们总是要求延迟低于 5 毫秒,那么需要在 kafka-broker、生产者和消费者级别进行哪些调整。

0 投票
1 回答
203 浏览

scala - 将任意数量的源与物化值相结合

给定xs: Seq[Source[A, Mat]]一个将单个物化器组合成一个单一物化器的功能,是否可以合并xs成一个物化成一个聚合的聚合源Mat

考虑这个实际的例子:

拥有NN Sources 类型的Kafka 主题Source[A, Consumer.Control],我想将它们组合(合并)成相同的Source[A, Consumer.Control],以便结果Consumer.Control处理所有原始控件。

对于 2 个来源,这是微不足道的。有这个:

我可以做这个:

Source.combineMat(s0, s1)(Merge(_))(new ConsumerControlBoth(_, _))

很想去

但是,我担心,由于每个combineMat都试图均匀地分配从其 2 个输入中获取的元素,这可能导致最终分布不均​​匀:从最后一个来源获取元素将有概率1/2,从倒数第二个获取-1/4等等。

另一方面,有 vararg 方法用于组合源而不考虑具体化的值,例如Source.combine,它的类型为Source[A, NotUsed]。我一直无法弄清楚如何调整它以使用组合物化价值。

我在这个假设上是对的,还是从这个意义上说结果是一致的?在一般情况下如何正确执行此操作?

升级版。我刚刚想出了这个(只是一个 POC,没有健全性检查等):

看起来没有上面提到的缺点,但我不确定我是否喜欢它。任何意见?

0 投票
1 回答
1688 浏览

java - 限制读取 Mongodb 集合的 Reactor Flux 的吞吐量

我正在使用 Spring 5,详细介绍 Reactor 项目,从庞大的 Mongo 集合中读取信息到 Kafka 主题。不幸的是,Kafka 消息的生成比使用它们的程序快得多。所以,我需要实现一些背压机制。

假设我想要每秒 100 条消息的吞吐量。谷歌搜索了一下,我决定结合该buffer(int maxSize)方法的特性,将结果与使用预定义间隔发出消息的 a一起压缩。Flux

有没有更聪明的方法来做到这一点?我的意思是,在我看来,它有点复杂(总体而言,拉链部分)。

0 投票
1 回答
1092 浏览

scala - Akka Stream Kafka,到达日志末尾时完成流

我正在使用Akka Streams Kafka,并且正在寻找一种方法来执行以下操作:

  • 从偏移量启动流x
  • 依次消费项目x, x+1, x+2.. 直到最后一个项目
  • 消费完最后一项后,完成流

代码看起来像

它会在最后一个元素被读取后完成。也许这不是这个库的使用方式。我怎样才能做到这一点?

0 投票
1 回答
1253 浏览

scala - 失败时优雅地重启 Reactive-Kafka Consumer Stream

问题 当我重新启动/完成/停止流时,旧的消费者不会死/关机:

描述 我正在构建一个服务,它接收来自 Kafka 主题的消息并通过 HTTP 请求将消息发送到外部服务。

  1. 与外部服务的连接可能会中断,我的服务需要重试请求。

  2. 此外,如果 Stream 中有错误,则需要重新启动整个流。

  3. 最后,有时我不需要流及其相应的 Kafka 消费者,我想关闭整个流

所以我有一个流:

发送Http请求sourceFunction

我遵循了新文档中的新 Kafka Consumer Restart 说明

在一个复杂的 Akka 流中,有一个问题在讨论这个非终止的消费者,但还没有解决方案。

是否有强制终止 Kafka 消费者的解决方法

0 投票
1 回答
447 浏览

java - 使用 Akka Kafka Streams 时不兼容的等式约束

我正在尝试按照Akka Kafka Streams文档使用 Akka Kafka Streams。这是我的代码:

但是上面的代码在 runwith() 处显示编译器错误: 在此处输入图像描述

这是 KafkaJacksonSerializer 的代码:

我不确定到底是什么问题。但是下面的代码没有显示任何错误:

有人可以帮我确定这里出了什么问题吗?

0 投票
1 回答
5077 浏览

java - Kafka 代理问题 - UnknownServerException

我们的应用程序使用springBootVersion = 2.0.4.RELEASE依赖compile('io.projectreactor.kafka:reactor-kafka:1.0.1.RELEASE')关系。

我们拥有的 Kafka Broker 版本为1.0.1.

reactor.kafka.sender.SenderRecord当我们通过创建和响应 Kafka 时间歇性地将消息发送到Kafka 时,reactor.kafka.sender.SenderResult.exception()我们有

java.lang.RuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request填充在异常中。

重试几次后,消息成功通过。

在代理日志上,以下错误被多次打印,没有任何堆栈跟踪

[2019-02-08 15:43:07,501] ERROR [ReplicaManager broker=3] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager)

price-promotions-local-event我们的话题在哪里。

我在网上看过,但没有明确的解决方案或方法来分类这个问题,非常感谢您的帮助。

0 投票
1 回答
489 浏览

java - Project Reactor Kafka 的 Zipkin 跟踪

我需要在一项基于 java 的服务中实现 Zipkin 跟踪,该服务使用 Project Reactor Kafka 进行反应流和非阻塞 IO 操作。我找不到任何支持响应式 Kafka 的勇敢的仪器库。

标准的 Kafka 客户端勇敢工具:

https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients

不支持 Reactive-Kafka。

是否有库或存储库可以帮助我在 java 中使用 Zipkin 跟踪响应式 Kafka?