问题标签 [reactive-kafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Akka Streams Kafka - 消费者单元测试
我试图通过设置 Kafka 服务器并使用生产者发送消息来在本地测试我的代码,但我想知道是否有一种方法可以为这段代码编写单元测试(测试消息是否由消费者是正确的)。
scala - 在 Akka Streams Kafka 中构造 ProducerMessage 时如何配置偏移参数?
我正在使用 Scala 2.11 和 Akka Streams Kafka 0.17。
我有一个流,其中:
- A
Source
是使用Source.actorRef
. 在这里,actor 被安排在某个固定的时间间隔运行并连续生成消息,这些消息被发送到流中。 - 我已将 a
Producer
作为Flow
. 生产者推ProducerMessage.Message
送到 Kafka 主题。 - 一些数据库操作。
我在构建 时遇到问题ProducerMessage.Message
,如下所示:
我可以轻松地传递record
包含实际消息的参数。但我不知道要在passThrough
参数中传递什么。根据文档:
该
passThrough
字段可以保存通过Consumer#flow
并包含在 中的任何元素Result
。当需要在下游操作上传递一些上下文时,这很有用。这可以通过解压缩/压缩来完成,但这更方便。例如,它可以是一个ConsumerMessage.CommittableOffset
或ConsumerMessage.CommittableOffsetBatch
可以稍后在流程中提交。
就我而言,没有任何 Kafka 消费者订阅 Kafka 主题并为我的流生成Source
(comittableSource
或plainSource
)。在这种情况下,我会按照文档中的说明传递消费者偏移量。但就我而言,演员正在模拟这样的消费者。这意味着我无权访问ConsumerMessage.CommittableOffset
. 那么我在passThrough
这里为参数传递什么?在这种情况下,最佳做法是什么?
apache-kafka - 在kafka中需要<5ms的可预测恒定延迟
我们计划使用 3 个 kafka-brokers 和一个 zookeeper 节点。我们为每个主题保留了一个分区。但是我们对 5ms 以下的延迟有严格的要求。我们正在使用 reactive-kafka 客户端库https://github.com/akka/reactive-kafka。
例如,我们有一个生产者每 30 毫秒产生 128 字节(大约)数据,消费者消费者每 50 毫秒产生 128 字节(大约)数据。
我们尝试过
在生产者方面。
以 50 毫秒在消费者端轮询
但是一旦我们将生产频率提高到 1 秒,延迟就开始变得不可预测,从 1 毫秒到 70 毫秒。
那么,如果我们总是要求延迟低于 5 毫秒,那么需要在 kafka-broker、生产者和消费者级别进行哪些调整。
scala - 将任意数量的源与物化值相结合
给定xs: Seq[Source[A, Mat]]
一个将单个物化器组合成一个单一物化器的功能,是否可以合并xs
成一个物化成一个聚合的聚合源Mat
?
考虑这个实际的例子:
拥有N
由N
Source
s 类型的Kafka 主题Source[A, Consumer.Control]
,我想将它们组合(合并)成相同的Source[A, Consumer.Control]
,以便结果Consumer.Control
处理所有原始控件。
对于 2 个来源,这是微不足道的。有这个:
我可以做这个:
Source.combineMat(s0, s1)(Merge(_))(new ConsumerControlBoth(_, _))
很想去
但是,我担心,由于每个combineMat
都试图均匀地分配从其 2 个输入中获取的元素,这可能导致最终分布不均匀:从最后一个来源获取元素将有概率1/2
,从倒数第二个获取-1/4
等等。
另一方面,有 vararg 方法用于组合源而不考虑具体化的值,例如Source.combine
,它的类型为Source[A, NotUsed]
。我一直无法弄清楚如何调整它以使用组合物化价值。
我在这个假设上是对的,还是从这个意义上说结果是一致的?在一般情况下如何正确执行此操作?
升级版。我刚刚想出了这个(只是一个 POC,没有健全性检查等):
看起来没有上面提到的缺点,但我不确定我是否喜欢它。任何意见?
java - 限制读取 Mongodb 集合的 Reactor Flux 的吞吐量
我正在使用 Spring 5,详细介绍 Reactor 项目,从庞大的 Mongo 集合中读取信息到 Kafka 主题。不幸的是,Kafka 消息的生成比使用它们的程序快得多。所以,我需要实现一些背压机制。
假设我想要每秒 100 条消息的吞吐量。谷歌搜索了一下,我决定结合该buffer(int maxSize)
方法的特性,将结果与使用预定义间隔发出消息的 a一起压缩。Flux
有没有更聪明的方法来做到这一点?我的意思是,在我看来,它有点复杂(总体而言,拉链部分)。
scala - Akka Stream Kafka,到达日志末尾时完成流
我正在使用Akka Streams Kafka,并且正在寻找一种方法来执行以下操作:
- 从偏移量启动流
x
- 依次消费项目
x
,x+1
,x+2
.. 直到最后一个项目 - 消费完最后一项后,完成流。
代码看起来像
它会在最后一个元素被读取后完成。也许这不是这个库的使用方式。我怎样才能做到这一点?
scala - 失败时优雅地重启 Reactive-Kafka Consumer Stream
问题 当我重新启动/完成/停止流时,旧的消费者不会死/关机:
描述 我正在构建一个服务,它接收来自 Kafka 主题的消息并通过 HTTP 请求将消息发送到外部服务。
与外部服务的连接可能会中断,我的服务需要重试请求。
此外,如果 Stream 中有错误,则需要重新启动整个流。
最后,有时我不需要流及其相应的 Kafka 消费者,我想关闭整个流
所以我有一个流:
发送Http请求sourceFunction
我遵循了新文档中的新 Kafka Consumer Restart 说明
在一个复杂的 Akka 流中,有一个问题在讨论这个非终止的消费者,但还没有解决方案。
是否有强制终止 Kafka 消费者的解决方法
java - 使用 Akka Kafka Streams 时不兼容的等式约束
我正在尝试按照Akka Kafka Streams文档使用 Akka Kafka Streams。这是我的代码:
这是 KafkaJacksonSerializer 的代码:
我不确定到底是什么问题。但是下面的代码没有显示任何错误:
有人可以帮我确定这里出了什么问题吗?
java - Kafka 代理问题 - UnknownServerException
我们的应用程序使用springBootVersion = 2.0.4.RELEASE
依赖compile('io.projectreactor.kafka:reactor-kafka:1.0.1.RELEASE')
关系。
我们拥有的 Kafka Broker 版本为1.0.1
.
reactor.kafka.sender.SenderRecord
当我们通过创建和响应 Kafka 时间歇性地将消息发送到Kafka 时,reactor.kafka.sender.SenderResult.exception()
我们有
java.lang.RuntimeException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
填充在异常中。
重试几次后,消息成功通过。
在代理日志上,以下错误被多次打印,没有任何堆栈跟踪
[2019-02-08 15:43:07,501] ERROR [ReplicaManager broker=3] Error processing append operation on partition price-promotions-local-event-0 (kafka.server.ReplicaManager)
price-promotions-local-event
我们的话题在哪里。
我在网上看过,但没有明确的解决方案或方法来分类这个问题,非常感谢您的帮助。
java - Project Reactor Kafka 的 Zipkin 跟踪
我需要在一项基于 java 的服务中实现 Zipkin 跟踪,该服务使用 Project Reactor Kafka 进行反应流和非阻塞 IO 操作。我找不到任何支持响应式 Kafka 的勇敢的仪器库。
标准的 Kafka 客户端勇敢工具:
https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients
不支持 Reactive-Kafka。
是否有库或存储库可以帮助我在 java 中使用 Zipkin 跟踪响应式 Kafka?