1

这是我的代码:

ssc =streamingcontext(sparkcontext,Seconds(time))
spark = sparksession.builder.config(properties).getorcreate()

val Dstream1: ReceiverInputDstream[Document] =  ssc.receiverStream(properties) // Dstream1 has Id1 and other fields

val Rdd2 = spark.sql("select Id1,key from hdfs.table").rdd // RDD[Row]

有没有办法加入这两个?

4

1 回答 1

0

您首先要转换您的 Dstream 和 Rdd 以使用 pairRDD。

这样的事情应该做。

val DstreamTuple = Dstream1.map(x => (x. Id1, x))
val Rdd2Tuple = Rdd2.map(x => (x. Id1, x))

一旦你这样做了,你可以简单地对 dstream 进行转换并将其加入 RDD。

val joinedStream = DstreamTuple.transform(rdd =>
   rdd.leftOuterJoin(Rdd2Tuple)
)

希望这可以帮助 :)

于 2019-09-23T19:18:15.780 回答