问题标签 [apache-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1012 浏览

java - 在eclipse中运行Kafka源码时出现的问题

情况是:我的Kafka项目正常,用Linux命令行运行“快速启动”,没有问题。然后我构建了一个Scala项目(Kafka在Scala中)并在/core目录中导入源代码,将包放在构建路径中,有错误,所以我尝试只在main/中导入包这是核心目录:

导入后的情况是:项目中有scala和java文件,错误日志说java文件找不到,项目中确实导入了一些scala文件。我应该如何处理该项目?如何正确导入scala java混合项目?

0 投票
4 回答
55378 浏览

apache-kafka - 使用 Kafka 进行数据建模?主题和分区

在使用新服务(例如非 RDBMS 数据存储或消息队列)时,我首先想到的事情之一是:“我应该如何构造我的数据?”。

我已经阅读并观看了一些介绍性材料。特别是,以Kafka:一个用于日志处理的分布式消息传递系统为例,它写道:

  • “主题是与消息关联的容器”
  • “并行度的最小单位是主题的分区。这意味着......属于某个主题的特定分区的所有消息都将被消费者组中的消费者消费。”

知道了这一点,什么是说明如何使用主题和分区的好例子?什么时候应该成为话题?什么时候应该是一个分区?

例如,假设我的 (Clojure) 数据如下所示:

主题应该基于user-id? viewed? at? 分区呢?

我该如何决定?

0 投票
1 回答
10176 浏览

scala - kafka Producer 上的“ClassCastException:kafka.message.Message 无法转换为 java.lang.String”

我正在尝试编写一些简单的生产者来将消息写入 kafka 我已经下载了 kafka0.7

运行kafka服务器,生产者和消费者

一切正常!

然后我创建了简单的消费者,它也可以工作。

但是下面的生产者总是抛出错误

得到以下异常

我在java代码中使用maven依赖项

请帮忙,我做错了什么?为什么我不能通过简单的 java 代码编写?

0 投票
4 回答
3584 浏览

hdfs - Kafka Storm HDFS/S3 数据流

目前尚不清楚您是否可以像在 Flume 中那样在 Kafka 中进行扇出(复制)。

我想让 Kafka 将数据保存到 HDFS 或 S3 并将该数据的副本发送到 Storm 以进行实时处理。Storm 聚合/分析的输出将存储在 Cassandra 中。我看到一些实现将所有数据从 Kafka 流入 Storm,然后从 Storm 流出两个输出。但是,我想消除 Storm 对原始数据存储的依赖。

这可能吗?您是否知道任何这样的文档/示例/实现?

另外,Kafka 对 S3 存储有很好的支持吗?

我看到 Camus 用于存储到 HDFS——你只是通过 cron 运行这项工作来不断地将数据从 Kafka 加载到 HDFS 吗?如果第二个作业实例在前一个作业完成之前开始,会发生什么?最后,Camus 会与 S3 一起工作吗?

谢谢,我很感激!

0 投票
2 回答
15205 浏览

java - 使用 Kafka Spout 的 Kafka Storm 集成

我正在使用 KafkaSpout。请在下面找到测试程序。

我正在使用 Storm 0.8.1。Storm 0.8.2 中有 Multischeme 类。我会用那个。我只想通过实例化 StringScheme() 类来了解早期版本是如何工作的?我在哪里可以下载早期版本的 Kafka Spout?但我怀疑这不是在 Storm 0.8.2 上工作的正确选择。???(使困惑)

当我在storm集群上运行代码(如下所示)时(即当我推送我的拓扑时)我得到以下错误(当Scheme部分被注释时会发生这种情况,当然我会得到编译器错误,因为该类在0.8中不存在.1):

在下面给出的代码中,您可能会发现 spoutConfig.scheme=new StringScheme(); 部分评论。如果我不评论那条很自然的行,我会收到编译器错误,因为那里没有构造函数。此外,当我实例化 MultiScheme 时,我会收到错误,因为我在 0.8.1 中没有该类。

0 投票
2 回答
2124 浏览

apache-zookeeper - 卡夫卡 0.8 一切都好,摇滚!.... Kafka 0.7 无法实现

Kafka 0.8 效果很好。我能够使用 CLI 以及编写我自己的生产者/消费者!

检查 Zookeeper... 我看到所有为 0.8 成功创建的主题和分区。

卡夫卡 0.7 不工作!

为什么选择 Kafka 0.7?我正在使用为 Kafka 0.7 制作的 Storm 的 Kafka Spout。

首先,我只想为 Kafka 0.7 运行基于 CLI 的生产者/消费者,但我无法做到。我执行以下步骤:

  1. 我删除了 Zookeeper 中从我的 Kafka 0.8 创建的所有主题/分区等
  2. 我将 zoo.cfg 中的 dataDir 更改为指向不同的位置。
  3. 现在我启动 kafka 服务器 0.7。它成功启动。但是我不知道为什么它会再次注册我删除的代理主题?
  4. 现在我启动 Kafka Producer :

    bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic topicime & 它成功启动:[2013-06-28 14:06:05,521] INFO zookeeper 状态已更改 (SyncConnected) (org.I0Itec.zkclient. ZkClient) [2013-06-28 14:06:05,606] INFO 在 0:0 (kafka.producer.ProducerPool) 为代理 id = 0 创建异步生产者

  5. 是时候发送一些消息了,哎呀,我收到了这个错误:

    [2013-06-28 14:07:19,650] 信息从 0:0 断开连接(kafka.producer.SyncProducer)[2013-06-28 14:07:19,653] 错误连接尝试到 0:0 失败,下次尝试在 1 ms (kafka.producer.SyncProducer) java.net.ConnectException: 在 sun.nio.ch.Net.connect(Net.java:364) 处 sun.nio.ch.Net.connect0(Native Method) 处拒绝连接。 nio.ch.Net.connect(Net.java:356) 在 sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:623) 在 kafka.producer.SyncProducer.connect(SyncProducer.scala:173) 在 kafka.producer .SyncProducer.getOrMakeConnection(SyncProducer.scala:196) 在 kafka.producer.SyncProducer.send(SyncProducer.scala:92) 在 kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135) 在 kafka.producer.async.DefaultEventHandler。发送(DefaultEventHandler.scala:58) 在 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) 在 kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) 在 scala.collection.immutable.Stream.foreach(Stream.scala :254) 在 kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) 在 kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)

请注意,Zookeeper 已经在运行。

任何帮助将不胜感激。

编辑:

我什至没有看到在 zookeeper 中创建的主题。我正在运行以下命令:

命令后一切正常,我收到以下消息:

但是现在当我输入要发送的字符串时,出现上述错误(连接被拒绝!)

0 投票
5 回答
26177 浏览

hadoop - 将文件分区为日期的从kafka写入hdfs的最有效方法是什么

我正在研究应该通过 kafka 写入 hdfs 的项目。假设有在线服务器将消息写入 kafka。每条消息都包含时间戳。我想根据消息中的时间戳创建一个输出将是文件/文件的作业。例如如果kafka中的数据是

我想将 3 个文件作为输出

当然,如果我再次运行这项工作并且队列中有一条新消息,例如

它应该创建一个文件

我看过一些开源,但其中大多数从 kafka 读取到一些 hdfs 文件夹。这个问题的最佳解决方案/设计/开源是什么

0 投票
0 回答
1759 浏览

classnotfoundexception - Apache Kafka:找不到主类

我正在尝试完成设置 kafka 的基本快速入门(可在此处找到:http: //kafka.apache.org/07/quickstart.html)。我已经运行了 sbt update 和 package 命令。不幸的是,当我运行时:

bin/zookeeper-server-start.sh 配置/zookeeper.properties

我看到错误“找不到主类 ...QuorumPeerMain”

同样,当我运行时:

bin/kafka-server-start.sh 配置/server.properties

我看到错误“找不到主类 ... kafka.Kafka”

有没有人见过类似的问题?我已经检查过了,看起来 zookeeper jar 和 kafka jar 在类路径中。如果有帮助,我有 7.1 版并且正在 Windows 上运行。非常感谢!

0 投票
4 回答
5062 浏览

java - 卡夫卡点对点

问题

我们有一个多数据中心的 activeMQ 设置,每个 HA 对都有 NFS,而且似乎 activeMQ 不是真正可扩展的,并且不能很好地解决 NFS 问题。(我们使用的是 5.7)

可能的解决方案

搬到卡夫卡

要求

  • 我们需要点对点和发布/订阅功能
  • 消息优先级(我知道 kafka 没有提供开箱即用的功能,但我们这边有一个解决方法)

问题

这对 Kafka 是否可行(不一定是开箱即用的,但需要一些客户端调整)?如果不是,那么您会建议什么其他技术?它不一定是 JMS,但它需要具有可扩展性和可靠性(并且它需要与 NFS 配合得很好)

0 投票
1 回答
4562 浏览

java - Maven重复标签“依赖”错误

我正在尝试从http://search.maven.org/#artifactdetails%7Corg.apache.kafka%7Ckafka_2.9.2%7C0.8.0-beta1%7CN%2FA中包含 apache-kafka

但是当我将它包含在我的 pom.xml 中并运行“mvn package”时,我收到以下错误:

我在这里做错了吗?看起来 maven 对“依赖项”被使用两次并不满意。任何帮助,将不胜感激。