我正在尝试从kafka读取数据并使用storm插入cassandra。我也配置了拓扑,但是我遇到了一些问题,我不知道为什么会这样。
这是我的提交者作品。
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt");
在这里,如果我评论最后一行,我看不到任何异常。最后一行,我收到以下错误:
InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])
有人可以帮我吗,这里有什么问题?
这是 CheckingBolt 中的 outputFieldDeclarer
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}
我在 CassandraInsertBolt 的 declareOutputFields 方法中没有任何内容,因为该螺栓不会发出任何值。
TIA