问题标签 [kafka-topic]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
334 浏览

apache-kafka - Zookeeper 如何从 __consumer_offsets 主题中检索消费者偏移量?

这是“ zookeeper 在哪里存储 Kafka 集群和相关信息? ”的后续问题,基于 Armando Ballaci 提供的答案。

现在很明显,消费者偏移量存储在 Kafka 集群中一个名为__consumer_offsets. 没关系,我只是想知道这些偏移量的检索是如何工作的。

主题不像 RDBS,我们可以基于某个谓词查询任意数据。例如 - 如果数据存储在 RDBMS 中,可能像下面这样的查询将获取某个消费者组的特定消费者的主题的特定分区的消费者偏移量。

select consumer_offset__read, consumer_offset__commited from consumer_offset_table where consumer-grp-id="x" and partitionid="y"

但显然这种检索在 Kafka Topics 上是不可能的。那么从主题中检索机制是如何工作的呢?有人可以详细说明吗?

(来自 Kafka 分区的数据在 FIFO 中读取,如果遵循 Kafka 消费者模型来检索特定偏移量,则必须处理大量额外数据并且速度会很慢。所以我想知道它是否以其他方式完成。 ..)

0 投票
0 回答
712 浏览

apache-kafka - Zookeeper:cnxn.saslServer 为空,Kafka:仲裁成员的 saslToken 为空

为了只提供对 kafka 的访问以创建、删除主题,我正在 kafka 和 zookeeper 之间创建一个纯文本 SASL 安全性。我收到以下错误,无法弄清楚原因。

docker-compose-sasl-plaintext.yml

kafka_server_jaas.conf

0 投票
2 回答
670 浏览

elasticsearch - 如何使用logstash将Kafka主题键值索引为字段?

我想知道将 Kafka 主题键值与消息一起作为单独的字段包含在 Elasticsearch 中使用 Logstash 进行索引。Kafka“主题”天气包含以下消息作为键值对:

波士顿:99
波士顿:89
纽约:75
纽约:85

我正在使用以下 Logstash 配置进行索引:

但是,只有值被索引,而不是键。索引如下所示:

我还需要在索引中包含城市名称(主题的键)。一旦我拥有两者,最终目标是在 Kibana 中针对每个城市的天气进行不同类型的可视化。

提前致谢。

0 投票
2 回答
596 浏览

apache-kafka - 尽管具有相同的配置,但具有不同保留的 Kafka 主题

我有一个关于保留主题消息的问题。

我有以下情况:

  • 两个 Kafka 主题;
  • 经纪人有财产log.retention.hours=1

描述两个主题中的每一个的命令:

问题是:

  1. 数字入职主题如何保留 1 天和数字贷款 1 周(根据我的观察 - 尝试多次从队列中获取所有消息,一个主题的保留时间为 1 天,另一个主题为 1星期)?

  2. --describe除了命令和 server.properties 文件之外,还有其他地方设置它吗?

编辑1:

server.properties 文件:

0 投票
1 回答
458 浏览

apache-kafka - 通过 Kafka 压缩主题遵守 GDPR

我想问你一些关于 apache kafka 和压缩主题的问题。我们想在 kafka 压缩主题上提供一些 PII 数据。我们想通过 tombstone 删除这个主题的数据。目前有多个问题需要我们验证我们的假设:

  1. 是否有其他公司通过像 KIP-354 建议的带有墓碑生成的压缩主题来满足 kafka 中的 gdpr 要求(遗忘权)https://cwiki.apache.org/confluence/display/KAFKA/KIP-354% 3A+Add+a+Maximum+Log+Compaction+Lag ?
  2. 我们的假设是否正确,只有当记录不在活动段文件中时才会触发压缩。所以在我们看来,kafka 文档需要修改,在 kafka 文档点 4.8 中添加:主题的 max.compaction.lag.ms 可以用来保证消息写入时间和时间之间的最大延迟消息变得有资格进行压缩。在这里它应该添加条件,我们要压缩的消息不应该在活动段文件中。这是 max.compaction.lag.ms 功能的错误还是设计的?我们目前还不确定。
  3. 是否仅在插入新消息后才触发压缩?或者是否还有一个压缩非活动段文件的异步过程?

感谢您的回答;-)

0 投票
1 回答
94 浏览

apache-kafka - 并行从单个流主题写入不同的主题

我有一个流,它将消息映射到两个不同的 map() 调用,并进一步被过滤并写入两个不同的主题。

有没有办法可以并行运行 stream.map(logc1OnData)... 和 stream.map(logic2OnData) ?看起来他们一个接一个地运行,即第一个映射被执行并写入topic1,然后第二个映射被执行并写入topic2 FYI ..我不想要num.threads.count,因为我的流输入来自单个主题和我正在运行同一应用程序的多个实例以从源主题主题中读取,以在使用时实现并行性。

我正在寻找的是在执行和写入不同主题时的并行性

0 投票
0 回答
89 浏览

apache-kafka - 加快删除具有相同键的旧记录(日志压缩主题)

我在我的 Mac 上使用默认服务器设置运行 Kafka 代理(版本 2.12)。我创建了一个日志压缩主题,具有以下配置 -

我生成了三个具有相同键/值的记录。5 分钟后(log.retention.check.interval.ms=300000),我从头开始阅读该主题。我在主题中看到以下三个记录。我期待 1 条记录。

10 分钟后,我生成了一条具有相同键、不同值的记录 (key1:bbb)。然后在 5 分钟后,从头开始阅读主题导致以下两条记录。我期待 1 条记录 (key1:bbb)。

我需要设置任何其他主题配置吗?任何帮助或指示将不胜感激。

0 投票
1 回答
2969 浏览

logging - 理解 kafka log.dirs

我有 Kafka 集群,并且 log.dirs=/data/kafka设置为 server.properties 中的数据目录。由于这些日志占了很大一部分,我的 DATA 分区不断变满。(谈论主题目录中的二进制日志,例如000000000000000.log)我在文档中阅读了有关此参数 的信息(log.dirs保存日志数据的目录。如果不设置,则使用log.dir中的值)

而且我还没有完全理解含义此外,它们可以删除,应该配置哪个保留?是否建议将其与数据目录分开?谢谢

0 投票
1 回答
107 浏览

apache-kafka - 使用 FilePulse 从 XML 中输入 Kakfa 主题中的数据 - ExplodeFilter

我正在创建一个来自 xml 并以 avro 格式写入主题的 kakfa 主题。我正在使用文件脉冲来执行此操作,并且在文档中我看到了 ExplodeFilter。我尝试根据文档进行配置,但它不起作用。连接 docker 控制台出现以下错误:

按照docker-compose的yaml:

按照输入的xml:

跟随连接器:

0 投票
1 回答
164 浏览

apache-kafka - kafka 自定义分区器和重新分区

我们可以为 kafka 主题指定自定义分区器。因此,kafka 生产者可以根据某些自定义算法确定性地将消息发送到特定分区。

现在的问题是,当我增加分区数量时,kafka 将如何在新分区之间重新分配现有消息?或者 Kafka 不会将消息分发到新分区?是否有可能触发这种重新分配?如果是这样,那么 kafka 将如何知道自定义分区器,因为那段代码驻留在生产者部分?