问题标签 [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
3497 浏览

apache-kafka - kafka 工具的奇怪行为:kafka.tools.GetOffsetShell

我想使用时间戳获取偏移量,并尝试使用 kafka.tools.GetOffsetShell 命令工具。文档为:https ://cwiki.apache.org/confluence/display/KAFKA/System+Tools

我认为这个命令在我们指定的时间戳之前返回最新的 N 个偏移量。但是我尝试了几个命令并感到困惑......

返回:

然后:

返回:

然后:

返回空集!!!

这个工具是如何工作的?

0 投票
1 回答
943 浏览

message-queue - Kafka如何同时实现分布式处理和高可用?

我有一个由 n 个分区组成的主题。为了进行分布式处理,我创建了两个在不同机器上运行的进程。他们使用相同的分组 id 订阅主题并分配 n/2 个线程,每个线程处理单个流(每个进程 n/2 个分区)。

有了这个,我将实现负载分配,但是现在如果进程 1 崩溃,那么进程 2 将无法使用分配给进程 1 的分区中的消息,因为它在开始时只监听 n/2 个流。

否则,如果我为 HA 配置并在两个进程上启动 n 个线程/流,那么当一个节点发生故障时,所有分区将由另一个节点处理。但是在这里,我们已经妥协了分布,因为所有分区将一次由单个节点处理。

有没有办法同时实现以及如何实现?

0 投票
0 回答
206 浏览

apache-kafka - ConsumerConnector 类的 commitOffsets 方法死循环?

我有一个具有以下骨架的卡夫卡消费者:

这个消费者在正常情况下工作得很好。但是在获取消息之后并且在提交偏移量之前,如果与 Zookeeper 服务器的连接中断,并且消费者尝试提交偏移量,那么它就会陷入无限循环。有没有办法让它出来??

0 投票
1 回答
2852 浏览

apache-spark - kafka 只消费新消息

我的 Spark 流式传输作业正在使用来自 Kafka 的数据

每当我重新开始我的工作时,它就会从最后一个偏移存储开始消耗(我假设这是因为发送处理后的数据需要很多时间,如果我更改消费者组,它会立即使用新消息)

我是 kafka 8.1.1,其中 auto.offset.reset 默认为最大,这意味着每当我重新启动 kafka 时,都会从我离开的地方发送数据。

我的用例要求我忽略这些数据并只处理到达的数据。我怎样才能做到这一点?任何建议

0 投票
1 回答
954 浏览

apache-storm - 无法使用 strom 集群从 kafka 读取

我正在运行 Strom 集群,其中 2 个主管和 1 个 nimbus 正在运行。我正在阅读主题 id 为“topic1”的kafka。但是在用户界面上我遇到了错误

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = noNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrok

和 worker_*.log 显示以下错误。

java.io.FileNotFoundException: org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299) 中不存在文件'/app/storm/supervisor/stormdist/airpushTop-1-1431081661/stormconf.ser'~ [commons-io-2.4.jar:2.4] at org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1763) ~[commons-io-2.4.jar:2.4] at backtype.storm.config$read_supervisor_storm_conf .invoke(config.clj:212) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.daemon.worker$worker_data.invoke(worker.clj:182) ~[storm-core- 0.9.4.jar:0.9.4] 在 backtype.storm.daemon.worker$fn__5033$exec_fn__1754__auto____5034.invoke(worker.clj:398) ~[storm-core-0.9.4.jar:0.9.4] 在 clojure。 lang.AFn.applyToHelper(AFn.java:185) [clojure-1.5.1.jar:na] 在 clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] at clojure.core$apply.invoke(core.clj:617) ~[clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$fn__5033$mk_worker__5089.doInvoke(worker.clj:389) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na] 在 backtype.storm.daemon.worker$ _main.invoke(worker.clj:500) [storm-core-0.9.4.jar:0.9.4] at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na ] 在 clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] 在 backtype.storm.daemon.worker.main(未知来源) [storm-core-0.9.4. jar:0.9.4] 2015-05-08T17:09:19.209+0530 bsutil [错误] 停止进程:(“初始化错误”)java.lang.RuntimeException:(“初始化错误”)在 backtype.storm.util $exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] 在 backtype.storm.daemon。 worker$fn__5033$mk_worker__5089.doInvoke(worker.clj:389) [storm-core-0.9.4.jar:0.9.4] at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1 .jar:na] 在 backtype.storm.daemon.worker$_main.invoke(worker.clj:500) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.AFn.applyToHelper(AFn. java:172) [clojure-1.5.1.jar:na] at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava.lang.RuntimeException: java.lang .RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9 .4.jar:0.9.4]jar:0.9.4] 在 clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] 在 backtype.storm.daemon.worker$fn__5033$mk_worker__5089.doInvoke(worker.clj :389) [storm-core-0.9.4.jar:0.9.4] at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na] at backtype.storm.daemon .worker$_main.invoke(worker.clj:500) [storm-core-0.9.4.jar:0.9.4] at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1. jar:na] 在 clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ NoNodeException: KeeperErrorCode = noNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.4.jar:0.9.4]jar:0.9.4] 在 clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] 在 backtype.storm.daemon.worker$fn__5033$mk_worker__5089.doInvoke(worker.clj :389) [storm-core-0.9.4.jar:0.9.4] at clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na] at backtype.storm.daemon .worker$_main.invoke(worker.clj:500) [storm-core-0.9.4.jar:0.9.4] at clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1. jar:na] 在 clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ NoNodeException: KeeperErrorCode = noNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown Source) [storm-core-0.9.4.jar:0.9.4]RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$fn__5033$mk_worker__5089.doInvoke(worker.clj:389) [storm-core-0.9.4 .jar:0.9.4] 在 clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na] 在 backtype.storm.daemon.worker$_main.invoke(worker.clj: 500) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na] 在 clojure.lang.AFn。 applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/ Storm.kafka.DynamicBrokt backtype.storm.daemon.worker.main 的 topic1/partitions(未知来源)[storm-core-0.9.4.jar:0.9.4]RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$fn__5033$mk_worker__5089.doInvoke(worker.clj:389) [storm-core-0.9.4 .jar:0.9.4] 在 clojure.lang.RestFn.invoke(RestFn.java:512) [clojure-1.5.1.jar:na] 在 backtype.storm.daemon.worker$_main.invoke(worker.clj: 500) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na] 在 clojure.lang.AFn。 applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/ Storm.kafka.DynamicBrokt backtype.storm.daemon.worker.main 的 topic1/partitions(未知来源)[storm-core-0.9.4.jar:0.9.4]storm.daemon.worker$fn__5033$mk_worker__5089.doInvoke(worker.clj:389) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.RestFn.invoke(RestFn.java:512) [clojure -1.5.1.jar:na] 在 backtype.storm.daemon.worker$_main.invoke(worker.clj:500) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.AFn。 applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na] at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava.lang.RuntimeException : java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown Source) [storm -core-0.9.4.jar:0.9.4]storm.daemon.worker$fn__5033$mk_worker__5089.doInvoke(worker.clj:389) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.RestFn.invoke(RestFn.java:512) [clojure -1.5.1.jar:na] 在 backtype.storm.daemon.worker$_main.invoke(worker.clj:500) [storm-core-0.9.4.jar:0.9.4] 在 clojure.lang.AFn。 applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na] at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava.lang.RuntimeException : java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown Source) [storm -core-0.9.4.jar:0.9.4]512) [clojure-1.5.1.jar:na] 在 clojure 的 backtype.storm.daemon.worker$_main.invoke(worker.clj:500) [storm-core-0.9.4.jar:0.9.4]。 lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na] at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava .lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = noNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown来源)[storm-core-0.9.4.jar:0.9.4]512) [clojure-1.5.1.jar:na] 在 clojure 的 backtype.storm.daemon.worker$_main.invoke(worker.clj:500) [storm-core-0.9.4.jar:0.9.4]。 lang.AFn.applyToHelper(AFn.java:172) [clojure-1.5.1.jar:na] at clojure.lang.AFn.applyTo(AFn.java:151) [clojure-1.5.1.jar:na] ava .lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = noNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown来源)[storm-core-0.9.4.jar:0.9.4]RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = noNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown Source) [风暴核心0.9.4.jar:0.9.4]RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = noNode for /brokers/topics/topic1/partitions atstorm.kafka.DynamicBrokt backtype.storm.daemon.worker.main(Unknown Source) [风暴核心0.9.4.jar:0.9.4]

我正在使用 kafka_2.11-0.8.2.1、apache-storm-0.9.4 和 zookeeper-3.4.6。

kafka 和 cluster 中没有不匹配的 zookeeper jar 版本。

请在这里帮助我。

提前谢谢了。

0 投票
0 回答
272 浏览

apache-kafka - sockect 127.0.0.1 的 kafka 服务器出错

我正在尝试运行 kafka -storm-cassandra ,在我的情况下,tail2kafka 本身就是一个生产者,当我开始消费主题时,它会抛出下面提到的错误。请帮帮我。

谢谢

[2015-05-13 15:28:51,784] 由于错误 (kafka.network.Processor) java.lang.OutOfMemoryError: Java heap space at kafka.api.ProducerRequest$$anonfun$1$ 错误关闭 /127.0.0.1 的套接字$anonfun$apply$1.apply(ProducerRequest.scala:45) at kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42) at scala.collection.TraversableLike$$anonfun$map $1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala: 282) 在 scala.collection.immutable.Range$$anon$1.foreach(Range.scala:27​​4) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:206) 在 scala.collection.immutable.Range.map (Range.scala:39) 在 kafka.api。ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42) at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. scala:227) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) 在 scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) 在 scala.collection .immutable.Range$$anon$1.foreach(Range.scala:27​​4) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227) at scala.collection.immutable.Range.flatMap(Range.scala:39 ) 在 kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38) 在 kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) 在 kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys .scala:36) 在 kafka.network.RequestChannel$Request.(RequestChannel.scala:53) 在 kafka.network.Processor.read(SocketServer.scala:353) 在 kafka.network.Processor.run(SocketServer.scala:245) 在 java .lang.Thread.run(Thread.java:745)

我的消费者代码是

导入结构导入时间

导入 kafka.io 导入 kafka.request_type

类消费者(kafka.io.IO):

CONSUME_REQUEST_TYPE = kafka.request_type.FETCH

MAX_SIZE = 1024 * 1024

# 秒。DEFAULT_POLLING_INTERVAL = 2

def init (self, topic, partition=0, host='localhost', port=9092): kafka.io.IO. 初始化(自身,主机,端口)

def consume(self): """ 从主题队列中消费数据。"""

def loop(self): """ 以阻塞方式循环来自队列的传入消息。设置polling检查间隔以秒为单位。"""

# 请求类型 ID + 主题长度 + 主题 + 分区 + 偏移量 + 最大尺寸 def request_size(self): return 2 + 2 + len(self.topic) + 4 + 8 + 4

def encode_request_size(self): return struct.pack('>i', self.request_size())

def encode_request(self): 长度 = len(self.topic)

def send_consume_request(self): self.write(self.encode_request_size()) self.write(self.encode_request())

def read_data_response(self): buf_length = struct.unpack('>i', self.read(4))[0]

def parse_message_set_from(self, data): 消息 = [] 已处理 = 0 长度 = len(data) - 4

0 投票
4 回答
24190 浏览

kafka-consumer-api - kafka 同步:“java.io.IOException:打开的文件太多”

我们遇到了卡夫卡的问题。有时突然,我们会在没有警告的情况下退出同步并在发出事件时开始出现异常。

我们得到的例外是

似乎这是 Kafka 在许多情况下抛出的一般异常。我们对其进行了一些调查,我们认为根本原因是尝试向某个主题发出事件时,它失败了,因为 kafka 没有该主题的领导分区

有人可以帮忙吗?

0 投票
1 回答
1201 浏览

add - Kafka 消费者 - 如何添加主题

在一个场景中,我有 N 个消费者(所有消费者都有 1 个流/没有分区),每个消费者都订阅了一组单独的主题,如何处理由生产者添加的新主题?

我应该为每个添加的主题创建新的消费者吗?或者我可以将此主题添加到已经工作的消费者吗?(怎么做?)

或者更好地保留 1 个具有 N 个消费者的消费者组,而不是在 N 个消费者(流)之间划分主题?

0 投票
1 回答
6135 浏览

apache-kafka - Kafka Java API 偏移操作说明

我正在尝试使用最新的 kafka_2.10-0.8.2.1 使用低级 Consumer Java API 手动管理偏移量。为了验证我从 Kafka 提交/读取的偏移量是否正确,我使用了 kafka.tools.ConsumerOffsetChecker 工具。

这是我的主题/消费者组的输出示例:

  这是我对结果的解释:

Offset = 5 --> 这是我的“elastic_search_group”消费者的当前偏移量

logSize = 29 --> 这是最新的偏移量 - 下一条消息的偏移量将到达这个主题/分区

Lag = 24 --> 29-5 - 我的“elastic_search_group”消费者尚未处理多少消息

Pid - 分区 ID

Q1:这是正确的吗?

现在,我想从我的 Java 消费者那里获得相同的信息。在这里,我发现我必须使用两个不同的 API:

卡夫卡.javaapi。OffsetRequest获取最早和最新的偏移量,但是 kafka.javaapi。OffsetFetchRequest获取当前偏移量。

要获得最早(或最新)的偏移量,我会:

为了获得当前的偏移量,我必须使用完全不同的 API:

Q2:对吗?为什么有两个不同的 API 来获取非常相似的信息?

Q3:我在这里使用哪个versionId和correlationId有关系吗?我虽然对于 pre-0.8.2.1 kafka 的 versionId 应该是 0,对于 0.8.2.1 及更高版本应该是 1 - 但似乎它也适用于 0 的 0.8.2.1 - 见下文?

因此,对于上述主题的示例状态,以及 ConsumerOffsetChecker 的上述输出,以下是我从 Java 代码中得到的信息:

当前偏移=5;最早偏移=29;最新偏移=29

'currentOffset' 似乎没问题,'latestOffset' 也是正确的,但 'earliestOffset' 呢?我希望它至少是'5'?

Q4:earlyOffset 怎么会比 currentOffset 高?我唯一的怀疑是,由于保留政策,可能来自该主题的消息被清除了……。还有其他可能发生的情况吗?

0 投票
1 回答
723 浏览

apache-kafka - Kafka如何并行消费多个主题

我有多个主题,并且来自每个主题的消息具有不同的行为。所以我必须以不同的方式处理每条消息。因此,我将消息及其主题与 Kafka 流区分开来。是否有任何解决方案,我希望以代码片段为例。感谢您的任何建议。