1

我想对推文的实时流进行简单分析。

如何在不需要 DAG 的情况下在 Hazelcast Jet 中使用 Twitter 流源?

细节

Twitter API 的封装在StreamTwitterP.java中做得很好。

但是,调用者将其用作 DAG 的一部分,c/o:

Vertex twitterSource = 
  dag.newVertex("twitter", StreamTwitterP.streamTwitterP(properties, terms));

我的用例不需要 DAG 的强大功能,所以我宁愿避免这种不必要的额外复杂性。

为了避免 DAG,我希望使用 SourceBuilder 为实时推文流定义一个新的数据源。

我假设它的代码类似于上面提到的 StreamTwitterP.java,但是我不清楚使用 Hazelcast JET 的 API 是否合适。

我指的是文档中的SourceBuilder 示例

4

1 回答 1

2

您可以将处理器转换为管道源:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>streamFromProcessor("twitter", 
    streamTwitterP(properties, terms)))
...

还有 twitterSource 版本SourceBuilder 在这里使用。

于 2018-11-13T10:07:15.027 回答