0

我在 Apache Storm 1.1.0 中运行一个简单的 Hello World 类型的应用程序。应用程序有一个随机整数 spout 和一个打印元组输出的 bolt。但不知何故,我无法让它在我的 Windows 系统上运行。

我是 Apache Storm 的新手,并且正在学习教程。我已经在堆栈溢出中寻找答案,但我找不到任何已解决的问题。

以下是我运行的拓扑代码:

public static void runTopology() {
    //String filePath = "./src/main/resources/operations.txt";
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("randomNumberSpout", new RandomIntSpout());
    builder.setBolt("printingBolt", new PrintingBolt()).shuffleGrouping("randomNumberSpout");

    Config config = new Config();
    config.setDebug(true);
    LocalCluster cluster = new LocalCluster();
    try{
        cluster.submitTopology("Test", config, builder.createTopology());
    }finally{
        cluster.shutdown();
    }
}

螺栓代码

public class PrintingBolt extends BaseBasicBolt {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println("Printing Tupple!!!!");
        System.out.println(tuple);
        System.out.println("Tupple processed " + tuple.getInteger(1));
        basicOutputCollector.emit(new Values(tuple.getInteger(1)));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("TestOutput"));

    }
}

喷口代码

public class RandomIntSpout extends BaseRichSpout {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private Random random;
    private SpoutOutputCollector outputCollector;

    /*@Override
    public void open(Map<String,Object> map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }*/

    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            random = new Random();
            outputCollector = collector;
    }
}

我也可以提供其余代码,但我认为这不是必需的。如果需要,请在评论中提及,我也会提供。

每当我尝试运行应用程序时,都会出现以下错误。

10620 [main] INFO oassoazZooKeeper - 启动客户端连接,connectString=localhost:2000/storm sessionTimeout=20000 > watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@31b0f02 10625 [main-SendThread(0:0 :0:0:0:0:0:1:2000)] 信息 oassoazClientCnxn - 打开与服务器的套接字连接 > 0:0:0:0:0:0:0:1/0:0:0:0:0 :0:0:1:2000。不会尝试使用 SASL 进行身份验证(未知错误)10627 [main-SendThread(0:0:0:0:0:0:0:1:2000)] 信息 oassoazClientCnxn - 套接字连接建立到 > 0:0:0: 0:0:0:0:1/0:0:0:0:0:0:0:1:2000,启动会话 10627 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] 信息 oassoazsNIOServerCnxnFactory - 接受来自 /> 0:0:0:0:0:0:0:1:56905 10628 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] 信息 oassoazs 的套接字连接

oazsNIOServerCnxn - 客户端的关闭套接字连接 /> 0:0:0:0:0:0:0:1:56905 其 sessionid 为 0x16a8e5abd97000d 10746 [main] INFO oassoacfiCuratorFrameworkImpl - 开始 10747 [main] INFO oassoazZooKeeper - 启动客户端连接,connectString =localhost:2000/storm sessionTimeout=20000 > watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@73893ec1 10755 [main-SendThread(127.0.0.1:2000)] INFO oassoazClientCnxn - 打开到服务器的套接字连接127.0.0.1/127.0.0.1:2000。不会 > 尝试使用 SASL 进行身份验证(未知错误)10756 [main-SendThread(127.0.0.1:2000)] 信息 oassoazClientCnxn - 与 127.0.0.1/127.0.0.1:2000 建立的套接字连接,正在启动 > 会话 10758 [NIOServerCxn.Factory :0.0.0.0/0.0.0.0:2000] 信息 oassoazs 0/0.0.0.0:2000] INFO oassoazsNIOServerCnxn - 客户端的关闭套接字连接 /> 0:0:0:0:0:0:0:1:56878 其 sessionid 为 0x16a8e5abd970004 11074 [main] INFO oassoazZooKeeper - 会话:0x16a8e5abd970004 已关闭11077 [Curator-Framework-0] 信息 oassoacfiCuratorFrameworkImpl - backgroundOperationsLoop 退出 11079 [ProcessThread(sid:0 cport:-1):] 信息 oassoazsPrepRequestProcessor - sessionid 的已处理会话终止:> 0x16a8e5abd970000 11081 [main] 信息 oassoaza6 1108d1 会话 97001:0x [main] INFO oaszookeeper - 关闭领导选举人的 zookeeper 连接。11082 [main-EventThread] INFO oassoazClientCnxn - EventThread 关闭 11082 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] 信息 oassoazs SessionTrackerImpl - 关闭 11116 [main] 信息 oassoazsPrepRequestProcessor - 关闭 11117 [main] 信息 oassoazsSyncRequestProcessor - 关闭 11117 [SyncThread:0] 信息 oassoazsSyncRequestProcessor - SyncRequestProcessor 已退出!11117 [ProcessThread(sid:0 cport:-1):] 信息 oassoazsPrepRequestProcessor - PrepRequestProcessor 退出循环!11117 [main] INFO oassoazsFinalRequestProcessor - 请求处理器的关闭完成 11118 [main] INFO oastesting - 在进程 Zookeeper 中完成关闭 11118 [main] INFO oastesting - 删除临时路径 C:\Users\AKHAND~1\AppData\Local\Temp\ ae4119b4-70b3-4d04-9aee-5bfae4c4775b 11203 [main] INFO oastesting - 删除临时路径 C:

我无法理解为什么关闭客户端套接字以及为什么关闭会话?我无法让它工作。请帮忙。

4

1 回答 1

3

我想你可能需要在这里添加一个睡眠

try{
        cluster.submitTopology("Test", config, builder.createTopology());
        //Sleep here
    }finally{
        cluster.shutdown();
    }

当前您正在提交拓扑,并立即关闭。除非您睡一会,否则您的拓扑将没有机会运行。

于 2019-05-06T19:36:52.127 回答