问题标签 [apache-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
5909 浏览

apache-storm - Storm-kafka 喷口消耗缓慢

我只是在尝试这里提到的 kafka-storm spout https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka,我使用的配置如下所述。

但是上面的 spout 从 Kafka 主题获取消息的速度大约是 7000 条/秒,但我预计每秒负载大约 50000 条消息。我尝试了各种增加 spoutConfig 中提取缓冲区大小的选项,但没有可见的结果。

有没有人遇到过类似的问题,他无法以生产者生成消息的速度通过风暴获取 kafka 主题?

0 投票
1 回答
3997 浏览

streaming - 如何整合 Storm 和 Kafka

我曾在Storm工作并开发了一个使用本地文本文件作为输入源的基本程序。但现在我必须处理来自外部系统的连续流数据。为此,Kafka 是最佳选择。

问题是如何让我的 Spout 从 Kafka 获取流数据。或者如何将 Storm 与 Kafka 集成。我该怎么做才能处理来自 Kafka 的数据?

0 投票
1 回答
3131 浏览

java - 让 kafka 消费者永远运行

我编写了一个高级 Kafka 消费者作为 Java 应用程序的一部分。

所以核心代码是这样的:

为了测试我的消费者,我还创建了一个生产者,写信给 kafka,然后启动了我的消费者,它可以工作。由于线程是在循环中执行的,我不确定我是否做对了。我希望我的消费者永远运行并继续使用来自 kafka 的消息。

让它永远运行的正确方法是什么?

0 投票
2 回答
14976 浏览

compression - Kafka 消息编解码器 - 压缩和解压

使用kafka时,我可以通过设置我的 kafka 生产者的 kafka.compression.codec 属性来设置编解码器。

假设我在生产者中使用 snappy 压缩,当使用一些 kafka 消费者消费来自 kafka 的消息时,我应该做些什么来解码来自 snappy 的数据,还是它是 kafka 消费者的一些内置功能?

相关文档中,我找不到与 kafka 消费者中的编码相关的任何属性(它仅与生产者有关)。

有人可以清除这个吗?

0 投票
2 回答
623 浏览

apache-kafka - kafka 0.72,最少经纪人数量

我正在尝试创建一个kafka生产者,它将消息发送到 kafka 经纪人(而不是动物园管理员)。

我知道更好的做法是使用 zk,但目前我想直接向代理发送消息。

为此,我按照文档中的描述设置属性“broker.list” 。问题是,为了让它工作,它至少需要 3 个经纪人(否则我得到一个例外)。

在kafka的源代码中我可以看到:

这很奇怪,因为在我的数据中心我只拥有 2 个 kafka 节点(和 3 个 zk),在这种情况下我该怎么办?有没有办法解决这个问题?

0 投票
1 回答
1042 浏览

apache-kafka - Kafka为消息添加前缀

使用kafka 7.2,当使用生产者发送消息时,我发现一旦使用它,消息就会在消息的开头带有附加部分。

例如,当向 kafka 发送一个简单的字符串“King Daniel”时,它在字节数组中如下所示:

但是当我出于某种原因消费它时,我得到:

哪个是字符串“........ֲִ.|King Daniel”

所以我在消息的开头还有 12 个字符。这是某种标题吗?我怎样才能得到我的原始信息?

这是我的消费者代码:

所以我正在将其写入msg.message().payload().array()一个文件,然后当我打开这个文件时,我可以看到原始内容,并在开头添加了 12 个额外的字符。

我怎样才能得到我确切的原始信息?

0 投票
1 回答
783 浏览

hbase - 使用storm从kafka获取消息

如何从动物园管理员那里获得最后的偏移时间?使用storm spout从kafka读取消息时。上下文:Kafka不断获取消息,consumer读取一段时间后由于任何原因关闭,然后consumer只读取最新消息但不从最后一个偏移读取

0 投票
1 回答
2199 浏览

hbase - 区分kafka中的已读和未读消息

有什么办法可以找到

  • 尚未使用消息的偏移量或时间戳

  • 最后一个被消费的时间戳

或者使用storm来区分kafka中的已读和未读消息

0 投票
3 回答
3819 浏览

apache-storm - Kafka Storm spout 更改拓扑并从旧偏移量消费

我正在使用 kafka spout 来消费消息。但是如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始?Kafka spout 让我们指定从哪里消费的时间戳,但我怎么知道时间戳?

0 投票
2 回答
15969 浏览

java - 未能执行目标 org.codehaus.mojo:exec-maven-plugin:1.2.1 - kafka Storm 集成

我正在研究 kafka strom 集成。我遇到了一个错误。当我尝试使用运行它时构建失败
mvn -e -f m2-pom.xml compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=storm.starter.MainTopology

这是 pom.xml 文件的片段:

我试过了

我正在使用storm-0.9.0-rc3和kafka-0.7.2