问题标签 [kafka-partition]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
294 浏览

python-3.x - 在多分区主题上正确查找和使用 Kafka 消息

我最近发现我一直在使用的一个主题是多分区而不是单分区。我需要重新配置我的消费者类来处理多个分区,但我有点困惑。我目前正在使用一个偏移组,test_offset_group为了下面的示例,我们称之为它。正常情况下,我会一直线性解析,及时继续前行;随着消息被添加到主题中,我将解析它们并继续前进,但如果发生崩溃或需要返回并重新运行前一天的提要,我需要能够按时间戳进行搜索。 Kafka在这个项目中是强制性的,所以我无法更改我正在使用的流数据服务的类型。

我像这样配置我的消费者:

如果我需要寻找时间戳,我将提供一个时间戳,然后使用以下方法寻找:

上述功能非常适合单个分区使用者,但是当您考虑多个分区时,这感觉非常笨拙和困难。我想我可以使用获取分区的数量, test_consumer.partitions_for_topic('test_topic") 然后遍历它们中的每一个,但是再一次,这似乎违背了 Kafka 的原则,我觉得应该有一种更简单的方法来做到这一点。

总结:我想了解如何使用 offset_group 功能寻找具有多个分区的多个偏移量,并且我想确认,通过执行上述操作,我实际上忽略了除 0 之外的所有分区?

0 投票
3 回答
5337 浏览

apache-kafka - Kafka log.segment.bytes vs log.retention.hours

我正在关注“Kafka:权威指南”第一版一书,以了解代理何时删除日志段。

根据我理解的文本,一个片段在关闭之前不会有资格被删除。段只有在达到 log.segment.bytes 大小时才能关闭(考虑 log.segment.ms 未设置)。一旦一个段符合删除条件,log.retention.ms 策略将应用于最终决定何时删除该段。

然而,这似乎与我在我们的生产集群(Kafka 版本 2.5)中看到的行为相矛盾。

一旦满足 log.retention.ms,日志段就会被删除,即使段大小小于 log.segment.bytes。

[2020-12-24 15:51:17,808] INFO [Log partition=Topic-2, dir=/Folder/Kafka_data/kafka] 由于保留时间 604800000 毫秒违规(kafka.log。日志)

[2020-12-24 15:51:17,808] INFO [Log partition=Topic-2, dir=/Folder/Kafka_data/kafka] 调度段删除 List(LogSegment(baseOffset=165828, size=895454171 , lastModifiedTime=1608220234000,最大时间=1608220234478)) (kafka.log.Log)

大小仍然小于 1GB,但该段已被删除。

该书在新闻发布时提到 Kafka 版本是 0.9.0.1 。在后来的 Kafka 版本中,这个设置也发生了变化。(我在 Kafka 文档中找不到任何具体提及此更改的内容)。以下是书中的片段。

在此处输入图像描述

0 投票
0 回答
21 浏览

kafka-partition - 如何在kafka中创建分区

我刚刚在新集群上升级了 kafka,并添加到我的 kubernetes 清单中:spec: kafka: version: 2.7.0 partitions: 12 replicas: 3

仍然当我通过执行以下操作检查 Zookeeper 时: kubectl -n centralised-logging exec my-cluster-kafka-2 -- bash bin/kafka-topics.sh --zookeeper zookeeper --describe --topic logstash

我看到我的 logstash 正在将数据发送到 12 个分区,但是当我重置偏移量时。我只看到一个。所以不确定如何在kafka中启用这些分区

0 投票
1 回答
289 浏览

spring-boot - Spring Boot kafka:微服务多实例、并发和分区

我有一个关于在 kafka 中发布和读取消息的方式的问题,用于微服务架构,具有相同微服务的多个实例用于写入和读取。我的主要问题是发布和读取的微服务配置了自动缩放,但默认实例数为 1。

关键是我有一个实体,我们称之为存储在 DDBB 中的“事件”,每个实体在 DDBB 中都有自己的 ID。当在特定实体中执行某些特定命令时(比如说 entityID = ajsha87),它必须发布一条消息,供消费者阅读。如果同一实体的每条消息都写在不同的分区中并同时使用(并发问题),我会遇到很多问题。

我的问题是,例如,我是否可以根据 entityID 设置该特定实体的所有事件将在哪些分区中发布。对于具有不同 ID 的另一个实体,我不关心分区,但同一实体的消息必须始终发布在同一个分区中,以避免消费者读取消息 (1) 之后发布的消息 (2)。有什么机制可以做到这一点,或者每次我保存我随机存储在 DDBB 中的实体时,它的消息将在其中发布的分区 ID?

消费者也是如此。只有一个消费者可以同时读取一个分区,因为如果不是,一个消费者号 1 可以从分区 (1) 中读取消息 (1) 与实体 (ID=78198) 相关联,然后另一个可以读取消息 (2) 从分区(1)与同一实体相关,并处理第一个之前的消息 2。

根据微服务自动缩放,每个实例只订阅一个分区有什么机制吗?

另一种选择是为每个新发布者实例动态分配一个分区,但我不知道如何动态配置它以根据微服务实例设置不同的分区 ID

顺便说一句,我正在使用弹簧靴

感谢您的回答和建议,如果我的英语不够好,请见谅。

0 投票
0 回答
228 浏览

apache-kafka - kafka Leader skew:在集群中添加新的broker并重新分配分区后,kafka brokers的leader出现倾斜

我有一个带有 3 个 zookeeper 节点和 4 个 Kafka 节点的 Kafka 集群。我在分区中添加了 2 个新代理。auto.leader.rebalance.enable所有代理上的配置都设置为 true leader.imbalance.check.interval.seconds,并且leader.imbalance.per.broker.percentage具有默认值。

为了在所有代理之间分配分区,我生成并重新分配了分区。但这一代人并没有在所有经纪人中产生平衡的领导力。

其中两个旧代理充当了 3 个分区的领导者,而 2 个新代理仅充当了一个分区的领导者。

重新分配后的分区分配: 在此处输入图像描述

这是通过运行./kafka-preferred-replica-election 命令解决的。

为什么 auto.leader.rebalance.enable 设置为 true 后没有自动触发再平衡。

另外,为什么会./kafka-reassign-partition.sh -generate产生偏斜的分区分配?

0 投票
2 回答
109 浏览

apache-kafka - 当生产者数量多于分区时,Kafka 默认分区器行为

kafka 常见问题页面

在 Kafka 生产者中,可以指定一个分区键来指示消息的目标分区。默认情况下,使用基于散列的分区器来确定给定键的分区 id

因此,具有特定键的所有消息将始终转到主题中的同一分区:

  1. 消费者如何知道生产者写入哪个分区,从而可以直接从该分区消费?
  2. 如果生产者多于分区,并且多个生产者正在写入同一个分区,那么偏移量是如何排序的,以便消费者可以消费来自特定生产者的消息?
0 投票
1 回答
151 浏览

apache-kafka - 如何在 Kafka Connect Sink 中指定 Kafka 主题的分区

我正在尝试为我的 Kafka Connect Sink 指定一个主题分区。特别是,我正在使用DataStax Apache Kafka Connector

有大量与为 Kafka Consumer 指定主题分区相关的文档和资源,例如:

但是,我根本找不到任何关于如何指定给定 Kafka Connect Sink 连接器读取哪个分区的信息。

似乎Confluent 连接器开发人员文档暗示应该可以指定分区,但我没有看到可以在通用Kafka Sink 配置属性文档DSE Kafka 连接器配置文档中设置的任何配置。

我的理解是,Kafka Connect Sink 基本上是写入给定数据存储的 Kafka Consumer 的特定实现。如果是这样,应该可以指定一个分区,对吗?还是我误解了 Kafka 连接器的工作原理?

0 投票
1 回答
42 浏览

apache-kafka - 基于分区Key的主题分区

我正在尝试根据分区键将流拆分为多个分区,但显然它不起作用。实现是这样的,我有一个类可以说Metrices

度量标准将一次异步使用一个,并且可能具有不同的MetriceType. 我想要实现的是在MetriceType. 到目前为止我所尝试的。

  1. 将消息键(partitionKey)设置为MetricesType

消息总是在Partition.Value= 0发布

  1. Confluent.Kafka 库中的 Partitoner 类,希望有类似于 Custom Partitoner的东西,这个链接但找不到任何 .net 实现。

所以我的问题是,在这种情况下,有没有办法根据属性拆分我的收入消息,MetriceType并将它们发布在他们的专用分区上(排序是必不可少的),或者我唯一的选择是使用和AdminClient编码创建一个主题分区计数,或者我可以研究另一种方法。提前致谢。

0 投票
2 回答
264 浏览

apache-kafka - 如何将特定 ID 分配给 Kafka 主题分区

我是 Apache Kafka 的新手。我想将我们的用户 id 作为 id 分配给主题分区。有没有办法将我们自己的用户 ID 分配给分区。我对此进行了几个小时的研究,但没有找到任何与将 ID 分配给分区相关的文章。

在向主题发布消息时,我想使用用户 ID 作为键。这样所有消息都进入同一个分区。而且我想确保一个分区应该只包含一个与用户相关的消息。

在消费来自分区的消息时,我可以在消费者中使用这个用户 ID 吗?

有没有办法实现这个功能?

0 投票
1 回答
121 浏览

apache-kafka - 如果 Kafka 的消费者处理消息的时间过长怎么办?Kafka 是否会将此分区重新指定给另一个消费者,并且消息会被双重处理?

假设Kafka, 1 partition, 2 consumers.(第二个消费者空闲)

假设第一个消费了一条消息,然后用其他 3 个服务处理它,突然粘在其中一个上,错过了 Kafka 的超时。

卡夫卡是否会将分区重新分配给第二个消费者并且消息将被双重处理(假设第一个最终成功)?