我正在使用 Apache Spark Streaming 1.6.1 编写一个 Java 应用程序,该应用程序连接两个键/值数据流并将输出写入 HDFS。这两个数据流包含 K/V 字符串,并使用 textFileStream() 从 HDFS 定期摄取到 Spark 中。
两个数据流不同步,这意味着在 t0 时刻在 stream1 中的某些键可能在 t1 时刻出现在 stream2 中,反之亦然。因此,我的目标是连接两个流并计算“剩余”键,这应该在下一个批处理间隔中考虑用于连接操作。
为了更好地阐明这一点,请查看以下算法:
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
我试图用 Spark Streaming 实现这个算法没有成功。最初,我以这种方式为剩余键创建了两个空流(这只是一个流,但生成第二个流的代码类似):
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String> () {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
稍后,这个空流与stream1 统一(即union()),最后,在join 之后,我添加了stream1 的剩余键并调用window()。stream2 也是如此。
问题是生成 left_keys_s1 和 left_keys_s2 的操作是没有操作的转换,这意味着 Spark 不会创建任何 RDD 流图,因此它们永远不会被执行。我现在得到的是一个连接,它只输出键在同一时间间隔内位于 stream1 和 stream2 中的记录。
你们有什么建议可以用 Spark 正确实现吗?
谢谢,马可