我是apache flink的新手。我的输入中有一个未绑定的数据流(通过 kakfa 输入到 flink 0.10)。
我想获得每个主键的第一次出现(主键是contract_num和event_dt)。
这些“重复”几乎立即发生在彼此之后。源系统无法为我过滤这个,所以 flink 必须这样做。
这是我的输入数据:
contract_num, event_dt, attr
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
这是我想要的输出数据:
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
请注意,第二行已被删除,因为 A001 和“2016-02-24 10:25:08”的组合键已经出现在第一行。
我怎么能用 flink 0.10 做到这一点?
我正在考虑使用keyBy(0,1)
,但之后我不知道该怎么做!
(我使用 joda-time 和 org.flinkspector 来设置这些测试)。
@Test
public void test() {
DateTime threeSecondsAgo = (new DateTime()).minusSeconds(3);
DateTime twoSecondsAgo = (new DateTime()).minusSeconds(2);
DateTime oneSecondsAgo = (new DateTime()).minusSeconds(2);
DataStream<Tuple3<String, Date, String>> testStream =
createTimedTestStreamWith(
Tuple3.of("A1", threeSecondsAgo.toDate(), "X"))
.emit(Tuple3.of("A1", threeSecondsAgo.toDate(), "Y"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A1", twoSecondsAgo.toDate(), "Z"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A2", oneSecondsAgo.toDate(), "C"), after(0, TimeUnit.NANOSECONDS))
.close();
testStream.keyBy(0,1);
}