我正在尝试将 Storm(请参见此处)集成到我的项目中。我理解了拓扑、spout 和 bolts 的概念。但现在,我试图弄清楚一些事情的实际实现。
A)我有一个使用 Java 和 Clojure 的多语言环境。我的 Java 代码是一个回调类,其中包含触发流数据的方法。推送到这些方法的事件数据是我想用作 spout 的。
所以第一个问题是如何将进入这些方法的数据连接到一个 spout ?我正在尝试i)传递一个backtype.storm.topology.IRichSpout,然后ii)将一个backtype.storm.spout.SpoutOutputCollector(参见此处)传递给该 spout 的打开函数(参见此处)。但我看不到实际传递任何类型的地图或列表的方法。
B)我项目的其余部分都是 Clojure。通过这些方法将有大量数据。每个事件的 ID 介于 1 和 100 之间。在 Clojure 中,我希望将来自 spout 的数据拆分到不同的执行线程中。我认为,这些将是螺栓。
如何设置 Clojure bolt 从 spout 获取事件数据,然后根据传入事件的 ID 中断线程?
提前感谢蒂姆
[编辑 1]
我实际上已经解决了这个问题。我最终1)实现了我自己的 IRichSpout。然后我2)将该 spout 的内部元组连接到我的 java 回调类中的传入流数据。我不确定这是否是惯用的。但它编译并运行没有错误。但是,3)我没有看到通过printstuff螺栓传入的流数据(肯定在那里)。
为了确保事件数据得到传播,在 spout 或 bolt 实现或拓扑定义中是否需要做一些特定的事情?谢谢。
;; 将 Java 回调绑定到我创建的 Spout (.setSpout java-callback ibspout) (storm/defbolt printstuff ["word"] [元组收集器] (println (str "printstuff --> tuple["tuple"] > collector["collector"]")) ) (风暴/拓扑 { "1" (storm/spout-spec ibspout) } { "3" (storm/bolt-spec { "1" :shuffle } 印刷品 ) })
[编辑 2]
根据 SO 成员Ankur的建议,我正在调整我的拓扑结构。创建 Java 回调后,我将它的元组传递给下面的 IBSpout,使用(.setTuple ibspout (.getTuple java-callback))
. 我没有传递整个 Java 回调对象,因为我得到了 NotSerializable 错误。一切都编译并运行没有错误。但同样,我的printstuff螺栓没有数据。嗯。
公共类 IBSpout 实现 IRichSpout { /** *风暴喷口的东西 */ 私人 SpoutOutputCollector _collector; 私有列表 _tuple = new ArrayList(); public void setTuple(List tuple) { _tuple = tuple; } 公共列表 getTuple() { return _tuple; } /** * Storm ISpout 接口函数 */ public void open(Map conf, TopologyContext context, SpoutOutputCollector 收集器) { _collector = 收集器; } 公共无效关闭(){} 公共无效激活(){} 公共无效停用(){} 公共无效 nextTuple() { _collector.emit(_tuple); } 公共无效确认(对象 msgId){} 公共无效失败(对象 msgId){} public void declareOutputFields(OutputFieldsDeclarer 声明者) {} public java.util.Map getComponentConfiguration() { return new HashMap(); } }