1

我正在尝试将 Kafka 与 Storm 集成。我正在使用 Kafka Spout 从 Kafka 主题中检索数据并将其提供给风暴螺栓以进行进一步处理。我能够成功提交拓扑,但 spout 没有发出任何数据。它也不会引发任何错误。我对 Kafka 和 Storm 很陌生。所以,我无法找到这个问题背后的原因。请提出修改建议。提前致谢!

提交拓扑后 Storm UI 的屏幕截图

我的拓扑:

public class TopologyMain {

 private static final String SENTENCE_SPOUT_ID = "kafka-sentence-spout";


public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
    int numSpoutExecutors = 1;


    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout(SENTENCE_SPOUT_ID, buildKafkaSentenceSpout(), numSpoutExecutors);
    builder.setBolt("word-normalizer", new WordNormalizer())
        .shuffleGrouping(SENTENCE_SPOUT_ID);
    builder.setBolt("word-counter", new WordCounter(),2)
        .shuffleGrouping("word-normalizer");

    //Configuration
    Config conf = new Config();
    conf.setDebug(false);
    //Topology run
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    conf.put(Config.NIMBUS_HOST, "192.168.1.229");
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
    System.setProperty("storm.jar", "/home/ubuntu/st/stIn/target/storm-wc.jar");
    StormSubmitter.submitTopology("Count-Word-Topology", conf,builder.createTopology());

}



 private static KafkaSpout buildKafkaSentenceSpout() {
      BrokerHosts hosts = new ZkHosts("localhost:2181");
      SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/acking-kafka-sentence-spout", "acking-sentence-spout");
      spoutConfig.forceFromStart = true;
      spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
      return new KafkaSpout(spoutConfig);
    }
 }
4

1 回答 1

0

我明确地将项目的 Maven 依赖项中的所有 jar 复制到了storm库,并且一切正常。我还将storm jar(用于提交拓扑的jar)复制到了storm/lib。

于 2015-05-28T05:11:09.003 回答