我想知道是否无论如何都使用 Flink 的数据流 API 从传入的记录中删除重复项(可能在特定时间窗口内),就像在提供称为“不同”的转换的 Dataset API 中一样。或者无论如何,如果数据集可以转换为数据流,假设数据集被转换为数据流以在 Flink 中进行内部处理。
请帮助我。提前致谢!干杯!
我想知道是否无论如何都使用 Flink 的数据流 API 从传入的记录中删除重复项(可能在特定时间窗口内),就像在提供称为“不同”的转换的 Dataset API 中一样。或者无论如何,如果数据集可以转换为数据流,假设数据集被转换为数据流以在 Flink 中进行内部处理。
请帮助我。提前致谢!干杯!
我不知道任何内置原语,但如果窗口中的所有数据都适合内存,那么您可以轻松地自己构建这个函数。
DataStream<...> stream = ...
stream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new DistinctFunction<>());
public class DistinctFunction<T, W extends Window> extends ProcessAllWindowFunction<T, T, W> implements Function {
public void process(final Context context, Iterable<T> input, Collector<R> out) throws Exception {
Set<T> elements = new HashSet<>();
input.forEach(elements::add);
elements.forEach(out::collect);
}
}
当然,如果你有一个键,它的可扩展性会更高,因为只有窗口中一个键的数据需要保存在内存中。