问题标签 [kafka-consumer-api]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Apache Kafka Consumer 有时不会在 Windows 7 中写入控制台:“无法写入标准输出,正在关闭消费者。”
Apache Kafka 消费者返回“无法写入标准输出,正在关闭消费者”。在某些情况下在 Windows 7 中。它适用于 Linux 和 Windows 8 中的相同消息以及 Windows 7 中的其他较短消息。我想知道是否有人已经遇到过这个问题?是否取决于消息的大小以及如何解决此问题?
java - kafka spout 没有发出数据
我正在尝试将 Kafka 与 Storm 集成。我正在使用 Kafka Spout 从 Kafka 主题中检索数据并将其提供给风暴螺栓以进行进一步处理。我能够成功提交拓扑,但 spout 没有发出任何数据。它也不会引发任何错误。我对 Kafka 和 Storm 很陌生。所以,我无法找到这个问题背后的原因。请提出修改建议。提前致谢!
我的拓扑:
kafka-consumer-api - kafka消费者迭代器是如何工作的
我发现这段代码成功读取了 kafka 主题并在屏幕上打印了每条消息。我想扩展它以对字符串执行其他操作,而不仅仅是在屏幕上打印。为此,我想了解迭代消息的 while 循环中发生了什么。it.hasNext() 有什么作用?它是否查找下一条消息或新消息列表。它什么时候会退出这个while循环?
apache-kafka - Kafka 消费者获取主题元数据失败
我正在尝试为第三方的 Kafka 和 ZooKeeper 服务器编写 Java 客户端。我能够列出和描述主题,但是当我尝试阅读任何主题时,会出现 a ClosedChannelException
。我在这里使用命令行客户端重现它们。
备用命令成功:
(ips 被编辑并替换为 255.255.255.255)
当我在谷歌上搜索这个异常时,我看到了生产者方面的问题——事实上,来源ClientUtils.fetchTopicMetadata
暗示这主要是由生产者使用的。
我担心的一个问题是,这可能是网络布局的产物:数据包被 Haproxy 破坏并通过 VPN 发送。
究竟是什么在这里工作?
apache-kafka - Kafka 多个消费者用于一个分区
我有一个生产者将消息写入主题/分区。为了保持排序,我想使用单个分区,我希望 12 个消费者从这个单个分区读取所有消息(没有消费者组,所有消息都应该发送给所有消费者)。这是可以实现的吗?我阅读了一些论坛,每个分区只有一个消费者可以阅读。
apache-kafka - kafka:“soTimeout”、“bufferSize”和“minBytes”对 SimpleConsumer 意味着什么?
我正在使用 Kafka 0.8.2.1 SimpleConsumer 。有人可以澄清 SimpleConsumer 和 FetchRequestBuilder 的一些配置参数的含义吗?由于没有阅读 KAfka 的源代码,我当时找不到任何文档。(我尝试将这个问题发布到 kafka 用户组 - 但没有运气):
-- Q1:在 SimpleConsumer 构造函数的签名中,我看到了 Int ' soTimeout'参数——这个超时是什么意思?这是连接到 Kafka 代理的超时吗?从任何 [或特定??] 对 Kafka 的请求(如 FetchRequest)获得响应时超时?还有什么?
-- Q2: 另外,SimpleConsumer 构造函数采用 Int 'bufferSize' 参数。它的意义是什么?这是发出 fetchRequest 时 SimpleConsumer 将读取多少字节?或者它是每次从 Kafka 读取的最大字节数 - 如果有更多数据可用,会发生多次读取?
-- 通过 FetchRequestBuilder(见下文)构建 FetchRequest 时,我还需要指定“ fetchSize ”:
查看 FetchRequestBuilder 的源代码,我认为(我不是 Scala 专业人士)这些调用转换为以下方法调用 - 并且传递给 FetchRequest 的最终参数称为“ minBytes ”,暗示这不是确切的获取大小,可能吗?. 这是否意味着它甚至不会获取任何东西,除非至少有“minBytes”的数据可用?
所以,我的最后一个问题是:
-- Q3:' bufferSize '和' fetchSize/minBytes '有什么关系?他们到底定义了什么?我必须确保一个比另一个更小或更大吗?
谢谢,
码头
apache-kafka - Kafka Consumer 从多个主题中读取
我对卡夫卡很陌生。我正在创建两个主题并从两个生产者那里发布这两个主题。我有一个消费者消费来自两个主题的消息。这是因为我想按照优先级进行处理。
我从两个主题中都得到了一个流,但是一旦我开始迭代ConsumerItreator
任何流,它就会在那里阻塞。正如它在文档中所写的那样,它将被阻止,直到它收到一条新消息。
有人知道如何从单个 Kafka 消费者中读取两个主题和两个流吗?
java - 如何在 Eclipse 下使用自定义 Kafka 生产者修复 NoClassDefFoundError?
我正在尝试在我的项目中包含 Kafka 模块。
我在 Eclipse 中添加了以下 jar 作为外部 jar 库,并且还更新了 build.xml 以包含对 jar 的引用:
- kafka-clients-0.8.2.0.jar
- kafka_2.10-0.8.2.0.jar
- scala-library-2.10.4.jar
我写了一个示例 Producer 类
在我开始这个项目之前,我已经确保 Zookeeper 和 Kafka 代理正在运行。但是,我看到NoClassDefFoundError()
它何时尝试实例化new KafkaProducer()
.
我错过了一些明显的东西吗?
apache-kafka - 如何使用 Kafka 0.8.2 的 Consumer API?
我从最新的 Kafka 文档http://kafka.apache.org/documentation.html开始。但是当我尝试使用新的消费者 API 时遇到了一些问题。我已经通过以下步骤完成了这项工作:
1.添加新的依赖
2.添加配置
3.使用KafkaConsumer API
但是,当我尝试从代理轮询消息时,我只得到空值:
然后在我检查了源代码后,我知道消费者出了什么问题:
更糟糕的是,我找不到有关 0.8.2 API 的任何其他有用信息,因为有关 Kafka 的所有用法都与最新版本不兼容。有人可以帮助我吗?非常感谢。
java - Kafka 获取偏移数据时出错。原因:1
我正在运行一个使用 kafka 库的 java 程序,并每 1 秒检查一次来自 zookeeper 的消费者组的提交偏移量。程序运行良好约 2 小时并开始抛出 RuntimeException:
...
原因1是什么?我找不到任何文件或页面来说明这种情况的根本原因。