我正在尝试将 Storm(请参见此处)集成到我的项目中。我理解了拓扑、spout 和 bolts 的概念。但现在,我试图弄清楚一些事情的实际实现。
A)我有一个使用 Java 和 Clojure 的多语言环境。我的 Java 代码是一个回调类,其中包含触发流数据的方法。推送到这些方法的事件数据是我想用作 spout 的。
所以第一个问题是如何将进入这些方法的数据连接到一个 spout ?我正在尝试i)传递一个backtype.storm.topology.IRichSpout,然后ii)将一个backtype.storm.spout.SpoutOutputCollector(参见此处)传递给该 spout 的打开函数(参见此处)。但我看不到实际传递任何类型的地图或列表的方法。
B)我项目的其余部分都是 Clojure。通过这些方法将有大量数据。每个事件的 ID 介于 1 和 100 之间。在 Clojure 中,我希望将来自 spout 的数据拆分到不同的执行线程中。我认为,这些将是螺栓。
如何设置 Clojure bolt 从 spout 获取事件数据,然后根据传入事件的 ID 中断线程?
提前感谢蒂姆
[编辑 1]
我实际上已经解决了这个问题。我最终1)实现了我自己的 IRichSpout。然后我2)将该 spout 的内部元组连接到我的 java 回调类中的传入流数据。我不确定这是否是惯用的。但它编译并运行没有错误。但是,3)我没有看到通过printstuff螺栓传入的流数据(肯定在那里)。
为了确保事件数据得到传播,在 spout 或 bolt 实现或拓扑定义中是否需要做一些特定的事情?谢谢。
;; 将 Java 回调绑定到我创建的 Spout
(.setSpout java-callback ibspout)
(storm/defbolt printstuff ["word"] [元组收集器]
(println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
)
(风暴/拓扑
{ "1" (storm/spout-spec ibspout)
}
{ "3" (storm/bolt-spec { "1" :shuffle }
印刷品
)
})
[编辑 2]
根据 SO 成员Ankur的建议,我正在调整我的拓扑结构。创建 Java 回调后,我将它的元组传递给下面的 IBSpout,使用(.setTuple ibspout (.getTuple java-callback)). 我没有传递整个 Java 回调对象,因为我得到了 NotSerializable 错误。一切都编译并运行没有错误。但同样,我的printstuff螺栓没有数据。嗯。
公共类 IBSpout 实现 IRichSpout {
/**
*风暴喷口的东西
*/
私人 SpoutOutputCollector _collector;
私有列表 _tuple = new ArrayList();
public void setTuple(List tuple) { _tuple = tuple; }
公共列表 getTuple() { return _tuple; }
/**
* Storm ISpout 接口函数
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector 收集器) {
_collector = 收集器;
}
公共无效关闭(){}
公共无效激活(){}
公共无效停用(){}
公共无效 nextTuple() {
_collector.emit(_tuple);
}
公共无效确认(对象 msgId){}
公共无效失败(对象 msgId){}
public void declareOutputFields(OutputFieldsDeclarer 声明者) {}
public java.util.Map getComponentConfiguration() { return new HashMap(); }
}