问题标签 [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
12881 浏览

java - Kafka突然重置消费者Offset

我正在使用 Kafka 0.8 和 zookeeper 3.3.5。实际上,我们有十几个主题正在消费,没有任何问题。

最近,我们开始喂养和消费一个行为怪异的新主题。消耗的偏移量突然被重置。它尊重我们设置的 auto.offset.reset 策略(实际上是最小的),但我不明白为什么该主题突然重置其偏移量。

我正在使用高级消费者。

这是我发现的一些错误日志:我们有一堆这样的错误日志:

每次发生此问题时,我都会看到 WARN 日志:

然后真正的问题发生了:

现在的问题是:是否有人已经经历过这种行为?当 Kafka 决定重置其偏移量时,是否有人可以告诉我 auto.offset.reset 是最大还是最小?

谢谢你。

0 投票
1 回答
721 浏览

apache-storm - 将storm的字数拓扑与kafka集成

我正在尝试将storm的字数统计程序与kafka集成,因为我的生产者工作正常,即它正在读取文本文件并将每一行作为消息发送,我可以在简单的消费者控制台中看到这些消息。现在为了将它与storm集成,即将这些消息/行发送到消费者spout,我刚刚用来自storm-spout集成依赖项的kafka spout替换了之前的单词统计程序的storm spout,并且程序的其余部分是相同的,我正在尝试在eclipse中运行它但它没有被执行,我不知道问题是什么,甚至不知道我是否以正确的方式做,这是我的主要课程 -

有 2 个螺栓 WordSplitterBolt() 和 WordCounterBolt() ,Wordsplitterbolt 将每一行/消息分成标记/单词,WordCounterBolt 正在计算每个单词。谁能告诉我我做错了什么?我需要创建自己的 spout 而不是使用预定义的 KafkaSpout 吗?我的主要课程正确吗?

0 投票
1 回答
388 浏览

java - Apache kafka 高级消费者——理解

我一直在寻找 Apache kafka 以实现高级消费者(我不想玩消息,我只需要将数据放入 MongoDB)v0.8.1.1

我查看了以下链接,其中显示了有关如何实现消费者的非常详细的信息。

Apache Kafka 消费者 wiki 另一个 kafka 消费者

但是对于所有线程关闭后消费者如何重新启动,我仍然一无所知。例如,假设我有 4 个消费者线程正在运行,它们消耗了来自 kafka 代理的所有消息,所以一旦没有消息,所有消费者将什么也不做,在特定超时后它将被关闭,所以我不确定消费者是如何再次当 kafka 代理中有新消息时重新启动。

有人可以分享一些代码或至少对此有一些指示。还有一种方法可以让我们在一些回调方法中拥有我们的业务逻辑,当有消息而不是 while 循环时,它会被调用。

0 投票
1 回答
13291 浏览

apache-kafka - 在 0.8.1 版本的 kafka 中创建主题时的 delete.retention.ms

我在 0.8.1 版本的 kafka 中创建主题时配置了这个属性 delete.retention.ms。这个属性有什么作用?是否会在特定时间间隔后从 kafka 删除消息。

0 投票
1 回答
2499 浏览

apache-kafka - 为什么这个 Kafka 消费者没有关闭?

我期望消费测试只读取一条消息并关闭。但是,即使在我调用consumer.shutdown(). 想知道为什么?

测试

被测类

0 投票
1 回答
198 浏览

apache-kafka - 在storm中存储批量kafka消息需要一些例子

我尝试使用普通消费者示例来检索消息。但现在我的消费者是风暴,所以我需要在风暴中存储批量 kafka 消息需要一些示例。

0 投票
3 回答
117828 浏览

apache-kafka - 向 Kafka 发送消息时是否需要密钥?

目前,我正在发送没有任何密钥的消息作为密钥消息的一部分,它仍然可以使用delete.retention.ms吗?我需要将密钥作为消息的一部分发送吗?将密钥作为消息的一部分这样好吗?

0 投票
1 回答
1163 浏览

java - 如何在 apache kafka 上为单个队列创建 Multiconsumer?

嗨,我的场景是有一个队列,大量资源将消息放入该队列,并且许多消费者阅读消息并执行特定工作。

对于这种情况,我使用此命令在 Kafka 中创建一个主题

现在我开发了一个 java 类来使用它

和另一个用于生成消息的 java 类

,并且正如 Apache 文档所说,如果主题想要充当队列,消费者应该有相同的 group.id,我做到了,但是当我运行 2、3 或事件更多消费者时,只有其中一个得到消息和其他人没有做任何事情。

事实上,我想要一个队列,它的排序对我来说并不重要,对我来说重要的是每条消息都只被一个消费者消费。

我想知道是否可以在 Kafka 中实现它,或者我应该使用其他产品,如 ActiveMQ、HornetMQ ......?

0 投票
4 回答
8958 浏览

python - Kafka-python 获取主题的分区数

我正在使用:https ://github.com/mumrah/kafka-python作为 Python 中的 kafka api。我想获取指定主题的分区数。我怎么做?

0 投票
2 回答
600 浏览

apache-kafka - 阿帕奇卡夫卡消费者连接

我正在查看Apache Kafka 的文档。

消费者使用 Zookeeper 的 IP 地址/端口连接到 Kafka。

是否可以使用代理的 IP 地址/端口?