问题标签 [kafka-consumer-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 为什么Kafka消费者性能很慢?
我有一个简单的主题,一个简单的 Kafka 消费者和生产者,使用默认配置。
程序很简单,我有两个线程。
在生产者中,它不断发送 16 字节的数据。
在消费者方面,它一直在接收。
我发现生产者的吞吐量大约是 10MB/s,这很好。
但是消费者的吞吐量只有 0.2MB/s。我已经禁用了所有调试日志,但这并没有让它变得更好。测试在本地机器上运行。任何机构都知道出了什么问题?谢谢!
我使用的代码如下: 生产者:
消费者:
apache-kafka - Apache Kafka 上下文中的“重新平衡”是什么意思?
我是 Kafka 的新用户,现在已经试用了大约 2-3 周。我相信目前我对 Kafka 在很大程度上是如何工作的有很好的了解,但是在尝试为我自己的 Kafka 消费者安装 API 之后(这很模糊,但我正在遵循应该的新 KafkaConsumer 的指导方针可用于 v 0.9,它在“主干”回购 atm 上)如果我有多个具有相同 groupID 的消费者,我会遇到从一个主题消耗的延迟问题。
在此设置中,我的控制台始终记录有关“重新平衡触发”的问题。当我将新的消费者添加到消费者组时是否会发生重新平衡,并且是否会触发它们以找出同一 groupID 中的哪个消费者实例将获得哪些分区或重新平衡完全用于其他用途?
我还从https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design看到了这段话,我似乎无法理解它,所以如果有人可以帮我制作不胜感激的感觉:
重新平衡是一组消费者实例(属于同一组)协调以拥有该组订阅的主题分区的互斥集的过程。在消费者组的成功重新平衡操作结束时,所有订阅主题的每个分区都将由组内的单个消费者实例拥有。再平衡的工作方式如下。每个代理都被选为消费者组子集的协调者。组的协调者代理负责编排针对订阅主题的消费者组成员资格更改或分区更改的重新平衡操作。它还负责将生成的分区所有权配置传达给正在进行重新平衡操作的组的所有使用者。
kafka-consumer-api - Kafka ConsumerConnector 的主题计数参数是什么
我是 apache kafka 的新手,并尝试使用给出的示例。
下面的代码片段是用来初始化一个ConsumerConnector的,我被topic count参数搞糊涂了;似乎它会导致 kafka 为该主题分发相应数量的流。但是,我尝试了几次,只有第一个流产生消息。所以,有两个问题:1.如何确定一个主题的计数?2. 消息如何跨流拆分?
提前致谢。
spring - spring-integration-kafka 配置消费者从指定分区接收消息
我开始在我的项目中使用 spring-integration-kafka,我可以生成和使用来自 Kafka 的消息。但是现在,我想向特定分区生成消息,并从特定分区消费消息。
示例我想向分区 3 生成消息,而消费只会接收来自分区 3 的消息。
到目前为止,我的主题有 8 个分区,我可以向特定分区生成消息,但我还没有找到配置消费者只接收来自特定分区的消息的方法。
因此,关于我应该如何使用 spring-integration-kafka 配置消费者的任何建议,或者任何其他需要与 KafkaConsumer.java 类有关的建议,它都可以从特定分区接收消息。
谢谢。
这是我的代码:
kafka-producer-context.xml
KafkaProducer.java
}
kafka-consumer-context.xml
KafkaConsumer.java
所以我的问题就在这里。当我向分区 3 或任何分区生成消息时,KafkaConsumer 总是会收到消息。我想要的是:KafkaConsumer 只会接收来自分区 3 的消息,而不是来自其他分区的消息。
再次感谢。
java - Kafka 消息 - Java 中的生产者和消费者客户端
提前感谢您的帮助。
我想在 Kafka 代理中以以下格式推送和拉取消息。
Java 中的生产者客户端应该能够以这种格式推送,消费者客户端应该能够读取和解析此消息。我怎样才能做到这一点?在 Kafka Java API 中是否有任何特定的方法可以做到这一点?
我已经编写了生产者和消费者 java 客户端来推送和拉取简单的文本消息。
非常感谢您的帮助。
再次感谢。
java - 插入 HBase 读取 Kafka 消息
我对 Kafka + Hbase 很陌生。感谢你的帮助。
我们得到了一个 Kafka 消息流,并希望将数据插入到从 Kafka 读取的 Hbase 中。
我编写了一个 java 消费者(单主题、单分区和一个线程)来读取来自主题的消息。一切皆好。在线程中,我试图连接 Hbase 以将消息插入到不起作用的表中。当它试图连接到 hbase(创建 HTable 的实例)时,读取流的 kafka 线程被杀死了。如何克服这一点将数据插入 Hbase?非常感谢您在这个问题上的想法和帮助。
Kafka 高级消费者代码
}
scala - Scala:将类传递给另一个类时,类接受类型参数?
我正在尝试创建一个可运行的类以与线程一起使用。该类的目的是获取一个KafkaStream并将一个函数应用于通过该流的每条消息,但是当我尝试将流传递给该类时,我收到以下编译器错误:“scala:45: class KafkaStream take type参数”。
这是课程。这个问题一定与如何将参数传递给 Scala 中的类有关,但我对 Scala 还是很陌生。
另外是否可以使函数可运行?我认为传递给函数比传递给类更容易。
java - 如何从 Java 中的 Kafka 服务器(不是 Zookeeper)获取所有 ConsumerGroup 的列表
Kafka 中是否有任何 Java Api 可以为我提供所有消费者组的列表以及他们正在消费的主题/分区此外,有没有什么方法可以从 kafka 服务器端获取 zookeeper 列表。注意:我可以从 Zookeeper 获取上述信息。但我想从 Kafka Server 获取它。你能帮我解决这个问题吗!!
提前致谢!!
apache-spark - 如何通过 SimpleConsumer 在 Kafka 中获取没有获取请求的消息的大小(元数据)?
我正在使用 SimpleConsumer 并尝试使用 spark 明智地提取消息大小(字节)。
我能够使用元数据请求获得最早和最新的偏移量,但不知道如何获取 kafka 中的字节数(0.8.0)。
我真的不想使用获取请求,因为想要检查足够的数据然后只会运行火花作业(而不是流式传输)来执行操作。
apache-kafka - DSE Spark Streaming+Kafka NoSuchMethodError
我正在尝试提交一个火花流 + kafka 作业,它只是从 kafka 主题中读取字符串行。但是,我收到以下异常
15/07/24 22:39:45 ERROR TaskSetManager: 阶段 2.0 中的任务 0 失败 4 次;线程“Thread-49”org.apache.spark.SparkException 中的中止作业异常:作业因阶段失败而中止:阶段 2.0 中的任务 0 失败 4 次,最近一次失败:阶段 2.0 中丢失任务 0.3(TID 73、10.11。 112.93): java.lang.NoSuchMethodException: kafka.serializer.StringDecoder.(kafka.utils.VerifiableProperties) java.lang.Class.getConstructor0(Class.java:2892) java.lang.Class.getConstructor(Class.java:1723) org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) org.apache.spark.streaming.receiver。 ReceiverSupervisor.start(ReceiverSupervisor.scala:106) org.apache.spark。
当我检查 DSE 使用的 spark jar 文件时,我发现它使用了 kafka_2.10-0.8.0.jar,它确实具有该构造函数。不确定是什么导致了错误。这是我的消费者代码
更新此异常似乎仅在我提交作业时发生。如果我使用 spark shell 通过粘贴代码来运行作业,它可以正常工作