我有一个包含可能损坏/恶意数据的数据集。数据带有时间戳。我正在使用启发式函数对数据进行评级。一段时间后,我知道所有带有一些 ID 的新数据项都需要被丢弃,它们代表了很大一部分数据(高达 40%)。
现在我有两个批处理管道:
- 第一个只是对数据进行评级。
- 第二个首先过滤掉损坏的数据并运行分析。
我想从批处理模式(例如,每天运行)切换到在线处理模式(希望延迟 < 10 分钟)。
第二个管道使用全局窗口,使处理变得容易。当检测到损坏的数据键时,所有其他记录都被简单地丢弃(也很容易使用前几天丢弃的键作为预过滤器)。此外,它可以更轻松地对输出数据做出决策,因为在处理给定键的所有历史数据期间都是可用的。
主要问题是:我可以在数据流 DAG 中创建循环吗?假设我想累积给我处理的每个会话窗口的质量率,如果速率总和超过 X,管道早期阶段的一些过滤器功能应该过滤掉恶意密钥。
我知道侧面输入,我不知道它是否可以在运行时改变。
我知道根据定义,DAG 不能有循环,但是没有它如何达到相同的结果?
我想到的想法是使用侧面输出将 ID 标记为恶意并制作假的无界输出/输入。输出会将数据转储到某个存储中,输入将每小时加载一次并流式传输,以便可以加入。