1

我根据本手册创建了一个源函数。

public static void main(String[] args) throws Exception {
    DirectProvider dp = new DirectProvider();
    Topology top = dp.newTopology();

    final URL url = new URL("http://finance.yahoo.com/d/quotes.csv?s=BAC+COG+FCX&f=snabl");

    TStream<String> linesOfWebsite = top.source(queryWebsite(url));
}

现在我想过滤这个流。我有这样的想法:

TStream<Iterable<String>> simpleFiltered = source.filter(item-> item.contains("BAX");

这是行不通的。有人知道如何过滤流吗?我不想更改请求 url 来预先进行过滤。

4

1 回答 1

0

从提供的信息很难判断。 dp.submit(top)需要运行拓扑。筛选器代码未指定使用正在指定的 URL 发生的项目。例如,

...
TStream<String> linesOfWebsite = top.source(queryWebsite(url));
linesOfWebsite.print(); // show what's received

TStream<String> filtered = linesOfWebsite.filter(t -> t.contains("BAC"));
filtered.sink(t -> System.out.println("filtered: " + t));

dp.submit(top);  // required
于 2016-12-16T20:29:07.777 回答