问题标签 [kafka-consumer-api]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1547 浏览

apache-kafka - Apache Kafka Consumer 有时不会在 Windows 7 中写入控制台:“无法写入标准输出,正在关闭消费者。”

Apache Kafka 消费者返回“无法写入标准输出,正在关闭消费者”。在某些情况下在 Windows 7 中。它适用于 Linux 和 Windows 8 中的相同消息以及 Windows 7 中的其他较短消息。我想知道是否有人已经遇到过这个问题?是否取决于消息的大小以及如何解决此问题?

0 投票
1 回答
918 浏览

java - kafka spout 没有发出数据

我正在尝试将 Kafka 与 Storm 集成。我正在使用 Kafka Spout 从 Kafka 主题中检索数据并将其提供给风暴螺栓以进行进一步处理。我能够成功提交拓扑,但 spout 没有发出任何数据。它也不会引发任何错误。我对 Kafka 和 Storm 很陌生。所以,我无法找到这个问题背后的原因。请提出修改建议。提前致谢!

提交拓扑后 Storm UI 的屏幕截图

我的拓扑:

0 投票
2 回答
4393 浏览

kafka-consumer-api - kafka消费者迭代器是如何工作的

我发现这段代码成功读取了 kafka 主题并在屏幕上打印了每条消息。我想扩展它以对字符串执行其他操作,而不仅仅是在屏幕上打印。为此,我想了解迭代消息的 while 循环中发生了什么。it.hasNext() 有什么作用?它是否查找下一条消息或新消息列表。它什么时候会退出这个while循环?

0 投票
4 回答
62602 浏览

apache-kafka - Kafka 消费者获取主题元数据失败

我正在尝试为第三方的 Kafka 和 ZooKeeper 服务器编写 Java 客户端。我能够列出和描述主题,但是当我尝试阅读任何主题时,会出现 a ClosedChannelException。我在这里使用命令行客户端重现它们。

备用命令成功:

(ips 被编辑并替换为 255.255.255.255)

当我在谷歌上搜索这个异常时,我看到了生产者方面的问题——事实上,来源ClientUtils.fetchTopicMetadata暗示这主要是由生产者使用的。

我担心的一个问题是,这可能是网络布局的产物:数据包被 Haproxy 破坏并通过 VPN 发送。

究竟是什么在这里工作?

0 投票
2 回答
10065 浏览

apache-kafka - Kafka 多个消费者用于一个分区

我有一个生产者将消息写入主题/分区。为了保持排序,我想使用单个分区,我希望 12 个消费者从这个单个分区读取所有消息(没有消费者组,所有消息都应该发送给所有消费者)。这是可以实现的吗?我阅读了一些论坛,每个分区只有一个消费者可以阅读。

0 投票
1 回答
1700 浏览

apache-kafka - kafka:“soTimeout”、“bufferSize”和“minBytes”对 SimpleConsumer 意味着什么?

我正在使用 Kafka 0.8.2.1 SimpleConsumer 。有人可以澄清 SimpleConsumer 和 FetchRequestBuilder 的一些配置参数的含义吗?由于没有阅读 KAfka 的源代码,我当时找不到任何文档。(我尝试将这个问题发布到 kafka 用户组 - 但没有运气):

-- Q1:在 SimpleConsumer 构造函数的签名中,我看到了 Int ' soTimeout'参数——这个超时是什么意思?这是连接到 Kafka 代理的超时吗?从任何 [或特定??] 对 Kafka 的请求(如 FetchRequest)获得响应时超时?还有什么?

-- Q2: 另外,SimpleConsumer 构造函数采用 Int 'bufferSize' 参数。它的意义是什么?这是发出 fetchRequest 时 SimpleConsumer 将读取多少字节?或者它是每次从 Kafka 读取的最大字节数 - 如果有更多数据可用,会发生多次读取?

-- 通过 FetchRequestBuilder(见下文)构建 FetchRequest 时,我还需要指定“ fetchSize ”:

查看 FetchRequestBuilder 的源代码,我认为(我不是 Scala 专业人士)这些调用转换为以下方法调用 - 并且传递给 FetchRequest 的最终参数称为“ minBytes ”,暗示这不是确切的获取大小,可能吗?. 这是否意味着它甚至不会获取任何东西,除非至少有“minBytes”的数据可用?

所以,我的最后一个问题是:

-- Q3:' bufferSize '和' fetchSize/minBytes '有什么关系?他们到底定义了什么?我必须确保一个比另一个更小或更大吗?

谢谢,

码头

0 投票
1 回答
12720 浏览

apache-kafka - Kafka Consumer 从多个主题中读取

我对卡夫卡很陌生。我正在创建两个主题并从两个生产者那里发布这两个主题。我有一个消费者消费来自两个主题的消息。这是因为我想按照优先级进行处理。

我从两个主题中都得到了一个流,但是一旦我开始迭代ConsumerItreator任何流,它就会在那里阻塞。正如它在文档中所写的那样,它将被阻止,直到它收到一条新消息。

有人知道如何从单个 Kafka 消费者中读取两个主题和两个流吗?

0 投票
1 回答
3577 浏览

java - 如何在 Eclipse 下使用自定义 Kafka 生产者修复 NoClassDefFoundError?

我正在尝试在我的项目中包含 Kafka 模块。

我在 Eclipse 中添加了以下 jar 作为外部 jar 库,并且还更新了 build.xml 以包含对 jar 的引用:

  • kafka-clients-0.8.2.0.jar
  • kafka_2.10-0.8.2.0.jar
  • scala-library-2.10.4.jar

我写了一个示例 Producer 类

在我开始这个项目之前,我已经确保 Zookeeper 和 Kafka 代理正在运行。但是,我看到NoClassDefFoundError()它何时尝试实例化new KafkaProducer().

我错过了一些明显的东西吗?

0 投票
2 回答
7749 浏览

apache-kafka - 如何使用 Kafka 0.8.2 的 Consumer API?

我从最新的 Kafka 文档http://kafka.apache.org/documentation.html开始。但是当我尝试使用新的消费者 API 时遇到了一些问题。我已经通过以下步骤完成了这项工作:

1.添加新的依赖

2.添加配置

3.使用KafkaConsumer API

但是,当我尝试从代理轮询消息时,我只得到空值:

然后在我检查了源代码后,我知道消费者出了什么问题:

更糟糕的是,我找不到有关 0.8.2 API 的任何其他有用信息,因为有关 Kafka 的所有用法都与最新版本不兼容。有人可以帮助我吗?非常感谢。

0 投票
1 回答
1411 浏览

java - Kafka 获取偏移数据时出错。原因:1

我正在运行一个使用 kafka 库的 java 程序,并每 1 秒检查一次来自 zookeeper 的消费者组的提交偏移量。程序运行良好约 2 小时并开始抛出 RuntimeException:

...

原因1是什么?我找不到任何文件或页面来说明这种情况的根本原因。