1

我有一个流束管道,我尝试在其中监控多个 glob/regex 模式。这些模式中很少有文件匹配,并且将来会生成很少的模式。

PCollection<String> fileGlobs = p.apply(Create.of(filePatterns));

PCollection<Metadata> f = fileGlobs.apply("MatchALL",
    FileIO.matchAll().continuously(
        Duration.standardSeconds(10),
        Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

f = .. some more transformations and then write to gcs ..

预期的行为是将现有文件与提供的模式匹配,并监视它们以查看是否正在将与这些模式匹配的新文件写入 GCS。我强制执行的终止条件是,如果生成的最后一个与特定模式匹配的文件是一个多小时前生成的,则不要尝试匹配模式。观察到的行为是我们匹配了很多文件,但是在获得无限 f 之后的转换根本没有被执行。日志只显示

polling returned 681384 results, of which 681384 were new. The output is incomplete.

我给出了 2 种不同的正则表达式模式来监视。现有的正则表达式模式之一已经有大约 500k 文件匹配,并且每分钟都在添加更多文件,我从未看到过输出,只是上面的日志行。第二个正则表达式模式匹配 0 个文件(在启动管道时),但一旦在未来某个时间点开始与新出现的文件匹配,这些输出文件就会被写入 gcs。

有人可以解释这种行为吗,如果我连续正确使用匹配。我没有在这里创建任何窗口,因为我的用例非常简单,流文件 -> 读取文件 -> 过滤一些事件 -> 将这些文件写回一些 gcs 存储桶。

4

1 回答 1

2

这是Splittable DoFn中的一个错误,如果单轮轮询时间超过 10 秒,它会影响Watch转换 - 当观看匹配大量文件的文件模式时会发生这种情况。该错误导致不产生任何输出,因为转换在取得任何进展之前就已设置检查点,因此当它从检查点恢复时,从某种意义上说,它是“回到正方形 1”。

请关注JIRA以获取更新和建议的解决方法。

于 2018-01-21T00:17:42.990 回答