我正在为这个项目试验 Storm 和 Trident,我正在使用 Clojure 和 Marceline 来做这件事。我正在尝试扩展Marceline 页面上给出的 wordcount 示例,以便句子 spout 来自 DRPC 调用,而不是来自本地 spout。我遇到的问题是因为 DRPC 流需要有结果才能返回到客户端,但我希望 DRPC 调用有效地返回 null,并简单地更新持久数据。
(defn build-topology
([]
(let [trident-topology (TridentTopology.)]
(let [
;; ### Two alternatives here ###
;collect-stream (t/new-stream trident-topology "words" (mk-fixed-batch-spout 3))
collect-stream (t/drpc-stream trident-topology "words")
]
(-> collect-stream
(t/group-by ["args"])
(t/persistent-aggregate (MemoryMapState$Factory.)
["args"]
count-words
["count"]))
(.build trident-topology)))))
代码中有两种选择 - 一种使用固定的批处理 spout 加载没有问题,但是当我尝试使用 DRPC 流加载代码时,我收到此错误:
InvalidTopologyException(msg:Component: [b-2] subscribes from non-existent component [$mastercoord-bg0])
我相信这个错误来自这样一个事实,即 DRPC 流必须尝试订阅输出才能有一些东西返回给客户端 - 但persistent-aggregate
不提供任何此类输出来订阅。
那么如何设置我的拓扑结构,以便 DRPC 流导致我的持久数据被更新?
小更新:看起来这可能是不可能的:( https://issues.apache.org/jira/browse/STORM-38