0

我正在尝试从kafka读取数据并使用storm插入cassandra。我也配置了拓扑,但是我遇到了一些问题,我不知道为什么会这样。

这是我的提交者作品。

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
        topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
        topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt"); 

在这里,如果我评论最后一行,我看不到任何异常。最后一行,我收到以下错误:

InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])

有人可以帮我吗,这里有什么问题?

这是 CheckingBolt 中的 outputFieldDeclarer

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}

我在 CassandraInsertBolt 的 declareOutputFields 方法中没有任何内容,因为该螺栓不会发出任何值。

TIA

4

1 回答 1

1

这里的问题是您混淆了流名称和组件(即 spout/bolt)名称。组件名称用于引用不同的螺栓,而流名称用于引用来自同一螺栓的不同流。例如,如果您有一个名为“evenOrOddBolt”的螺栓,它可能会发出两个流,一个“偶数”流和一个“奇数”流。但是在很多情况下,你只有一个流来自一个 bolt,这就是为什么 Storm 有一些方便的方法来使用默认流名称。

当你这样做时.shuffleGrouping("checkingbolt"),你正在使用这些方便的方法之一,有效地说“我希望这个螺栓使用来自的默认流checkingbolt”。如果您想显式命名流,则可以使用此方法的重载版本,但仅当您有多个流来自同一个螺栓时,它才有用。

当你这样做时ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));,你是说螺栓将在名为“cassandraBoltStream”的流上发出。这可能不是您想要做的,您想要声明它将在默认流上发出。您可以改用该ofd.declare方法来执行此操作。

有关更多详细信息,请参阅文档

于 2020-01-21T14:03:59.677 回答