17

我在Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload使用 IDE 向生产集群提交拓扑时遇到了一个问题,而如果我在命令行中使用storm jar命令执行同样的事情,它的运行就像天堂一样。我从githublink看到了同样的例子。

对于提交拓扑,我正在使用这些行

conf.put(Config.NIMBUS_HOST, NIMBUS_NODE);
conf.put(Config.NIMBUS_THRIFT_PORT,6627);
conf.put(Config.STORM_ZOOKEEPER_PORT,2181);
conf.put(Config.STORM_ZOOKEEPER_SERVERS,ZOOKEEPER_ID);
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter submitter = new StormSubmitter();
submitter.submitTopology("test", conf, builder.createTopology());

如果这是正确的运行方法,请建议我?

4

4 回答 4

29

很好地找到了解决方案。当我们运行“storm jar”时,它会在提交的 jar 中触发storm.jar 的属性标志。因此,如果我们想以编程方式提交一个 jar,那么只需以这种方式设置标志

System.setProperty("storm.jar", <path-to-jar>);

例如:

System.setProperty("storm.jar", "/Users/programming/apache-storm-1.0.1/lib/storm-core-1.0.1.jar");
StormSubmitter.submitTopology("myTopology", config, builder.createTopology());
于 2013-04-05T11:10:35.413 回答
5

要将拓扑提交到远程 Storm 集群,您需要将该 jar 上传到 nimbus 机器,然后使用 NimbusClient 将该 jar 提交到集群。
你可以这样做:

Map storm_conf = Utils.readStormConfig();
storm_conf.put("nimbus.host", "<Nimbus Machine IP>");
Client client = NimbusClient.getConfiguredClient(storm_conf)
                                .getClient();
String inputJar = "C:\\workspace\\TestStormRunner\\target\\TestStormRunner-0.0.1-SNAPSHOT-jar-with-dependencies.jar";
NimbusClient nimbus = new NimbusClient(storm_conf, "<Nimbus Machine IP>",
                                <Nimbus Machine Port>);
 // upload topology jar to Cluster using StormSubmitter
String uploadedJarLocation = StormSubmitter.submitJar(storm_conf,
                                inputJar);

String jsonConf = JSONValue.toJSONString(storm_conf);
nimbus.getClient().submitTopology("testtopology",
                      <uploadedJarLocation>, jsonConf, builder.createTopology());

这是工作示例:向远程风暴集群提交拓扑

于 2014-07-04T04:47:07.680 回答
4

我没有运行 java 代码来提交自己,但我检查了storm命令 - 它是一个 python 文件,它运行 java 和http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html

我认为您应该担心的唯一一件事是在执行时包含所有需要的库。

于 2013-04-05T10:15:18.860 回答
3

我已经根据@abhi 和@Nishu Tayal 的回答解决了这个问题,我想在这里发布我的代码:

public static void submitLocalTopologyWay1(String topologyName, Config topologyConf, 
        StormTopology topology, String localJar) {
    try {
        //get default storm config
        Map defaultStormConf = Utils.readStormConfig();
        defaultStormConf.putAll(topologyConf);

        //set JAR
        System.setProperty("storm.jar",localJar);

        //submit topology
        StormSubmitter.submitTopology(topologyName, defaultStormConf, topology);

    } catch (Exception e) {
        String errorMsg = "can't deploy topology " + topologyName + ", " + e.getMessage();
        System.out.println(errorMsg);
        e.printStackTrace();
    } 
}

public static void submitLocalTopologyWay2(String topologyName, Config topologyConf, 
        StormTopology topology, String localJar) {
    try {
        //get nimbus client
        Map defaultStormConf = Utils.readStormConfig();
        defaultStormConf.putAll(topologyConf);
        Client client = NimbusClient.getConfiguredClient(defaultStormConf).getClient();

        //upload JAR
        String remoteJar = StormSubmitter.submitJar(defaultStormConf, localJar);

        //submit topology
        client.submitTopology(topologyName, remoteJar, JSONValue.toJSONString(topologyConf), topology);

    } catch (Exception e) {
        String errorMsg = "can't deploy topology " + topologyName + ", " + e.getMessage();
        System.out.println(errorMsg);
        e.printStackTrace();
    } 
}

那么这是一个测试,您必须先将代码构建到 JAR 文件中。

public void testSubmitTopologySubmitLocalTopologyWay1() {   
    Config config = new Config();
    config.put(Config.NIMBUS_HOST,"9.119.84.179");   
    config.put(Config.NIMBUS_THRIFT_PORT, 6627);
    config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("9.119.84.177","9.119.84.178","9.119.84.176")); 
    config.put(Config.STORM_ZOOKEEPER_PORT,2181);

    config.put(Config.TOPOLOGY_WORKERS, 3);

    RemoteSubmitter.submitLocalTopologyWay1("word-count-test-1", config, 
            WordCountTopology.buildTopology(), // your topology
            "C:\\MyWorkspace\\project\\storm-sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar");//the JAR file
}
于 2016-06-14T11:04:47.170 回答