2

我想通过 spout 从一个数据库中获取数据并处理数据并使用 trident 将其存储在另一个数据库中。我是 Storm 和 trident 的新手,我不确定如何实现它。我从数据库中获取数据spout(实现 trident 支持的 IRichSpout 的单独 java 类),我将它作为对象发出。我需要将它传递给 trident 拓扑进行处理(计算记录数)并将其存储到数据库中。

 TridentTopology topology = new TridentTopology();  
 TridentState wordCounts =
          topology.newStream("spout1",spout)

现在新的流需要一个喷口作为输入,即语法是

 Stream storm.trident.TridentTopology.newStream(String txId, IRichSpout spout)

但我想将 spout 发出的对象作为流的输入,供 trident 处理并保存到数据库。那么我怎样才能将我的 spout 类带入 trident 并将其传递给新的流,或者我应该将 spout 和三叉戟同班??

有人可以帮忙吗......

4

1 回答 1

1

你可以做类似的事情

    MyFooSpout spout = new MyFooSpout();
    topology.newStream("spout1", spout)....

MyFooSpout类应该在哪里实现IRichSpout

来自trident 教程 中的newStream方法在TridentTopology从任何输入源读取的拓扑中创建数据流。

在你的情况下,它可能是MyFooSpout

.我在一个 spout 中从数据库中获取数据(实现 trident 支持的 IRichSpout 的单独的 java 类),我将它作为一个对象发出

你能澄清一下你到底指的是什么吗?你的 spout 代码是什么样子的?作为一个非常通用的示例,如果我们编写类似(取自教程页面)

    TridentState wordCounts = topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word"))

这意味着spout应该发出一个字段,即sentence. 通过调用eachSplit函数将应用于流中的每个元组,这将根据通过获取该sentence字段编写的任何代码执行。但是,这可能会根据您的要求而有所不同。例如,它可以是 a FilterasMyFilter extends BaseFilter或 a functionas MyCustomFuction extends BaseFunction。查看 API 页面了解更多详情。

于 2013-09-13T20:00:27.957 回答