0

我正在接收两个套接字流 S1 和 S2 分别具有模式 S1 和 S2。

我想使用火花流加入关于属性“a”的 S1 和 S2。以下是我的代码:

    sc = SparkContext("local[3]", "StreamJoin")
    ssc = StreamingContext(sc, 1) 

    S1 = ssc.socketTextStream("localhost", 9999)
    S2 = ssc.socketTextStream("localhost", 8085)

    # Create windowed stream
    wS1 = S1.window(10)
    wS2 = S2.window(1)

    wS1.flatMap(lambda line: line.split(",")).pprint()
    wS2.flatMap(lambda line: line.split(",")).pprint()

    # Perform join
    joinedStream = wS1.join(wS2)

    joinedStream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x)))

    ssc.start()             
    ssc.awaitTermination()

S1 和 S2 都用逗号分隔。

虽然上面的代码执行了连接,但是相对于完整的行。

我有兴趣加入关于特定属性的两个流,在本例中为属性“a”。我怎样才能做到这一点?

非常感谢!

4

1 回答 1

0

join 在 spark 中的工作方式是它基于键连接 rdd 行,键是 row[0] 处的值。所以你可以这样做:

wS1.flatMap(lambda line: line.split(",")).map(lambda x: (x[0], x)).pprint()
wS2.flatMap(lambda line: line.split(",")).map(lambda x: (x[0], x)).pprint()

然后将根据拆分列表的第一个元素进行连接。

文档参考:

https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=join#pyspark.RDD.join

于 2017-12-05T12:41:02.787 回答