1

我正在为这个项目试验 Storm 和 Trident,我正在使用 Clojure 和 Marceline 来做这件事。我正在尝试扩展Marceline 页面上给出的 wordcount 示例,以便句子 spout 来自 DRPC 调用,而不是来自本地 spout。我遇到的问题是因为 DRPC 流需要有结果才能返回到客户端,但我希望 DRPC 调用有效地返回 null,并简单地更新持久数据。

(defn build-topology
  ([]
   (let [trident-topology (TridentTopology.)]
     (let [
           ;; ### Two alternatives here ###
           ;collect-stream (t/new-stream trident-topology "words" (mk-fixed-batch-spout 3))
            collect-stream (t/drpc-stream trident-topology "words")
          ]
          (-> collect-stream
              (t/group-by ["args"])
              (t/persistent-aggregate (MemoryMapState$Factory.)
                                      ["args"]
                                      count-words
                                      ["count"]))
          (.build trident-topology)))))

代码中有两种选择 - 一种使用固定的批处理 spout 加载没有问题,但是当我尝试使用 DRPC 流加载代码时,我收到此错误:

InvalidTopologyException(msg:Component: [b-2] subscribes from non-existent component [$mastercoord-bg0])

我相信这个错误来自这样一个事实,即 DRPC 流必须尝试订阅输出才能有一些东西返回给客户端 - 但persistent-aggregate不提供任何此类输出来订阅。

那么如何设置我的拓扑结构,以便 DRPC 流导致我的持久数据被更新?

小更新:看起来这可能是不可能的:( https://issues.apache.org/jira/browse/STORM-38

4

0 回答 0