我正在尝试安装一个 Kafka-Storm “Hello World”系统。我已经安装并运行了 Kafka,当我使用 Kafka 生产者发送数据时,我可以使用 Kafka 控制台消费者读取它。
我从 O'Reilly 的“Getting Started With Storm”一书中选取了第 2 章的示例,并将其修改为使用 KafkaSpout 而不是常规的 spout。
当我运行应用程序时,数据已经在 kafka 中挂起,KafkaSpout 的 nextTuple 没有收到任何消息 - 它进入,尝试遍历协调器下的空管理器列表,然后退出。
我的环境是一个相当老的 Cloudera VM,带有 Storm 0.9 和 Kafka-Storm-0.9(最新),以及 Kafka 2.9.2-0.7.0。
这就是我定义 SpoutConfig 和拓扑的方式:
String zookeepers = "localhost:2181";
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
"gtest",
"/kafka", // zookeeper root path for offset storing
"KafkaSpout");
spoutConfig.forceStartOffsetTime(-1);
KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig);
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", kafkaSpout, 1);
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
有人可以帮我弄清楚为什么我没有收到任何东西吗?
谢谢,G。