3

我正在尝试安装一个 Kafka-Storm “Hello World”系统。我已经安装并运行了 Kafka,当我使用 Kafka 生产者发送数据时,我可以使用 Kafka 控制台消费者读取它。

我从 O'Reilly 的“Getting Started With Storm”一书中选取了第 2 章的示例,并将其修改为使用 KafkaSpout 而不是常规的 spout。

当我运行应用程序时,数据已经在 kafka 中挂起,KafkaSpout 的 nextTuple 没有收到任何消息 - 它进入,尝试遍历协调器下的空管理器列表,然后退出。

我的环境是一个相当老的 Cloudera VM,带有 Storm 0.9 和 Kafka-Storm-0.9(最新),以及 Kafka 2.9.2-0.7.0。

这就是我定义 SpoutConfig 和拓扑的方式:

String zookeepers = "localhost:2181";

SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
        "gtest",
        "/kafka",  // zookeeper root path for offset storing
        "KafkaSpout");
spoutConfig.forceStartOffsetTime(-1);

KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig);


//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", kafkaSpout, 1);
builder.setBolt("word-normalizer", new WordNormalizer())
    .shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
    .fieldsGrouping("word-normalizer", new Fields("word"));

//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());

有人可以帮我弄清楚为什么我没有收到任何东西吗?

谢谢,G。

4

4 回答 4

4

如果您已经消费了该消息,则不应再阅读该消息,除非您的生产者产生新消息。这是因为您的代码中的forceStartOffsetTime调用-1

形成storm-contrib文档:

spout 中另一个非常有用的配置是强制 spout 回退到以前的偏移量的能力。您在 spout 配置上执行 forceStartOffsetTime ,如下所示:

   spoutConfig.forceStartOffsetTime(-2);

它将选择围绕该时间戳写入的最新偏移量来开始消费。您可以通过传入 -1 强制 spout 始终从最新的偏移量开始,也可以通过传入 -2 强制它从最早的偏移量开始。

你的制片人长什么样?有一个片段会很有用。您可以将 -1 替换为 -2 并查看是否收到任何内容,如果您的生产者很好,那么您应该可以消费。

于 2013-08-21T19:34:31.327 回答
1
SpoutConfig spoutConf = new SpoutConfig(...)
spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
于 2014-12-26T07:02:51.383 回答
0

我经历了一些悲伤的风暴和卡夫卡的整合。这些都是快速发展且相对年轻的项目,因此很难获得工作示例来快速开始您的开发。

为了帮助其他开发人员(并希望其他人贡献我也可以使用的有用示例),我启动了一个 github 项目来存放与 Storm/Kafka(和 Esper)开发相关的代码片段。

欢迎您在这里查看> https://github.com/buildlackey/cep

(单击storm+kafka 目录可以找到一个可以帮助你启动并运行的示例程序)。

于 2013-10-25T21:56:58.680 回答
0
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
        "gtest", // name of topic used by producer & consumer
        "/kafka",  // zookeeper root path for offset storing
        "KafkaSpout");

您正在使用“gtest”主题来接收数据。确保您是由生产者从该主题发送数据。

在螺栓中,像这样打印那个元组

public void execute(Tuple tuple, BasicOutputCollector collector) {
        System.out.println(tuple);
    }

它应该在 kafka 中打印待处理的数据。

于 2013-08-05T07:06:38.557 回答