5

我正在尝试将 Storm(请参见此处)集成到我的项目中。我理解了拓扑、spout 和 bolts 的概念。但现在,我试图弄清楚一些事情的实际实现。

A)我有一个使用 Java 和 Clojure 的多语言环境。我的 Java 代码是一个回调类,其中包含触发流数据的方法。推送到这些方法的事件数据是我想用作 spout 的。

所以第一个问题是如何将进入这些方法的数据连接到一个 spout ?我正在尝试i)传递一个backtype.storm.topology.IRichSpout,然后ii)将一个backtype.storm.spout.SpoutOutputCollector参见此处)传递给该 spout 的打开函数(参见此处)。但我看不到实际传递任何类型的地图或列表的方法。

B)我项目的其余部分都是 Clojure。通过这些方法将有大量数据。每个事件的 ID 介于 1 和 100 之间。在 Clojure 中,我希望将来自 spout 的数据拆分到不同的执行线程中。我认为,这些将是螺栓。

如何设置 Clojure bolt 从 spout 获取事件数据,然后根据传入事件的 ID 中断线程?

提前感谢蒂姆

[编辑 1]

我实际上已经解决了这个问题。我最终1)实现了我自己的 IRichSpout。然后我2)将该 spout 的内部元组连接到我的 java 回调类中的传入流数据。我不确定这是否是惯用的。但它编译并运行没有错误。但是,3)我没有看到通过printstuff螺栓传入的流数据(肯定在那里)。

为了确保事件数据得到传播,在 spout 或 bolt 实现或拓扑定义中是否需要做一些特定的事情?谢谢。

      ;; 将 Java 回调绑定到我创建的 Spout
      (.setSpout java-callback ibspout)

      (storm/defbolt printstuff ["word"] [元组收集器]
        (println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
      )
      (风暴/拓扑
       { "1" (storm/spout-spec ibspout)
       }
       { "3" (storm/bolt-spec { "1" :shuffle }
                               印刷品
             )
       })

[编辑 2]

根据 SO 成员Ankur的建议,我正在调整我的拓扑结构。创建 Java 回调后,我将它的元组传递给下面的 IBSpout,使用(.setTuple ibspout (.getTuple java-callback)). 我没有传递整个 Java 回调对象,因为我得到了 NotSerializable 错误。一切都编译并运行没有错误。但同样,我的printstuff螺栓没有数据。嗯。

    公共类 IBSpout 实现 IRichSpout {

      /**
       *风暴喷口的东西
       */
      私人 SpoutOutputCollector _collector;

      私有列表 _tuple = new ArrayList();
      public void setTuple(List tuple) { _tuple = tuple; }
      公共列表 getTuple() { return _tuple; }

      /**
       * Storm ISpout 接口函数
       */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector 收集器) {
        _collector = 收集器;
      }
      公共无效关闭(){}
      公共无效激活(){}
      公共无效停用(){}
      公共无效 nextTuple() {
        _collector.emit(_tuple);
      }
      公共无效确认(对象 msgId){}
      公共无效失败(对象 msgId){}


      public void declareOutputFields(OutputFieldsDeclarer 声明者) {}
      public java.util.Map getComponentConfiguration() { return new HashMap(); }

    }

4

2 回答 2

3

似乎您正在将 spout 传递给您的回调类,这似乎有点奇怪。当一个拓扑被执行时,storm会定期调用spoutsnextTuple方法,因此你需要做的是将java回调传递给你的自定义spout实现,这样当storm调用你的spout时,spout调用java回调来获取下一组元组被馈送到拓扑中。

要理解的关键概念是 Spouts在风暴请求时提取数据,您不会将数据推送到 spouts。您的回调不能调用 spout 将数据推送到它,而是当您的 spoutnextTuple方法被调用时,您的 spout 应该(从某些 java 方法或任何内存缓冲区)提取数据。

于 2013-04-03T04:42:44.310 回答
0

对 B 部分的回答:

对我来说,直截了当的答案听起来就像您正在寻找一个字段分组,这样您就可以控制在执行期间按 ID 将哪些作品分组在一起。

也就是说,我不相信这真的是一个完整的答案,因为我不知道你为什么要这样做。如果您只想要平衡的工作负载,则随机分组是更好的选择。

于 2013-04-02T19:24:28.437 回答