我有一个流束管道,我尝试在其中监控多个 glob/regex 模式。这些模式中很少有文件匹配,并且将来会生成很少的模式。
PCollection<String> fileGlobs = p.apply(Create.of(filePatterns));
PCollection<Metadata> f = fileGlobs.apply("MatchALL",
FileIO.matchAll().continuously(
Duration.standardSeconds(10),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
f = .. some more transformations and then write to gcs ..
预期的行为是将现有文件与提供的模式匹配,并监视它们以查看是否正在将与这些模式匹配的新文件写入 GCS。我强制执行的终止条件是,如果生成的最后一个与特定模式匹配的文件是一个多小时前生成的,则不要尝试匹配模式。观察到的行为是我们匹配了很多文件,但是在获得无限 f 之后的转换根本没有被执行。日志只显示
polling returned 681384 results, of which 681384 were new. The output is incomplete.
我给出了 2 种不同的正则表达式模式来监视。现有的正则表达式模式之一已经有大约 500k 文件匹配,并且每分钟都在添加更多文件,我从未看到过输出,只是上面的日志行。第二个正则表达式模式匹配 0 个文件(在启动管道时),但一旦在未来某个时间点开始与新出现的文件匹配,这些输出文件就会被写入 gcs。
有人可以解释这种行为吗,如果我连续正确使用匹配。我没有在这里创建任何窗口,因为我的用例非常简单,流文件 -> 读取文件 -> 过滤一些事件 -> 将这些文件写回一些 gcs 存储桶。