1
JavaRDD<String> history_ = sc.emptyRDD();

java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);

JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
  return new Tuple2< String,ArrayList<String> >(null,null);
});  

 JavaPairInputDStream<String, GenericData.Record> stream_1 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_1);


JavaPairInputDStream<String, GenericData.Record> stream_2 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_2);

然后进行一些转换并创建类型为 twp DStream Data_1 和 Data_2

JavaPairDStream<String, <ArrayList<String>>

并按如下方式进行连接,然后过滤掉那些没有连接键的记录并将它们保存在历史记录中,以便通过与 Data_1 进行联合来在下一批中使用它

 Data_1 = Data_1.union(history);

JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
    Data_1.leftOuterJoin(Data_2).cache();


JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());

history = dstream_filtered.mapToPair(r -> {
  return new Tuple2<>(r._1,r._2._1);
}).persist;

我在上一步之后获得了历史记录(通过将其保存到 hdfs 进行检查),但是在进行联合时,这个历史记录仍然是批量空的。

4

1 回答 1

2

从概念上讲,“记住” a 是不可能的DStreamDStreams是有时间限制的,并且在每个时钟滴答(称为“批处理间隔”)上,它DStream 表示在该时间段内流中观察到的数据。

因此,我们不能DStream保存一个“旧”来加入一个“新” DStream。所有人都DStreams活在“现在”。

的底层数据结构DStreamsRDD:每个批次间隔,我们DStream将有 1RDD个该间隔的数据。 RDDs 表示数据的分布式集合。RDDs 是不可变的和永久的,只要我们有对它们的引用。

我们可以结合RDDs 和DStreams 来创建这里需要的“历史翻转”。

它看起来与问题的方法非常相似,但仅使用history RDD.

以下是建议更改的高级视图:

var history: RDD[(String, List[String]) = sc.emptyRDD()

val dstream1 = ...
val dstream2 = ...

val historyDStream = dstream1.transform(rdd => rdd.union(history))
val joined = historyDStream.join(dstream2)

... do stuff with joined as above, obtain dstreamFiltered ...

dstreamFiltered.foreachRDD{rdd =>
   val formatted = rdd.map{case (k,(v1,v2)) => (k,v1)} // get rid of the join info
   history.unpersist(false) // unpersist the 'old' history RDD
   history = formatted // assign the new history
   history.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
   history.count() //action to materialize this transformation
}

这只是一个起点。还有关于checkpointing 的其他注意事项。否则,historyRDD 的沿袭将无限增长,直到某些 StackOverflow 发生。这个博客对这种特殊技术非常完整:http ://www.spark.tc/stateful-spark-streaming-using-transform/

我还建议您使用 Scala 而不是 Java。Java 语法过于冗长,无法与 Spark Streaming 一起使用。

于 2017-06-08T13:35:43.807 回答