我正在接收两个套接字流 S1 和 S2 分别具有模式 S1 和 S2。
我想使用火花流加入关于属性“a”的 S1 和 S2。以下是我的代码:
sc = SparkContext("local[3]", "StreamJoin")
ssc = StreamingContext(sc, 1)
S1 = ssc.socketTextStream("localhost", 9999)
S2 = ssc.socketTextStream("localhost", 8085)
# Create windowed stream
wS1 = S1.window(10)
wS2 = S2.window(1)
wS1.flatMap(lambda line: line.split(",")).pprint()
wS2.flatMap(lambda line: line.split(",")).pprint()
# Perform join
joinedStream = wS1.join(wS2)
joinedStream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x)))
ssc.start()
ssc.awaitTermination()
S1 和 S2 都用逗号分隔。
虽然上面的代码执行了连接,但是相对于完整的行。
我有兴趣加入关于特定属性的两个流,在本例中为属性“a”。我怎样才能做到这一点?
非常感谢!