我正在尝试在 Dstream 和静态 RDD 之间执行连接。
PySpark
#Create static data
ip_classification_rdd = sc.parallelize([('log_name','enrichment_success')])
#Broadcast it to all nodes
ip_classification_rdd_broadcast = sc.broadcast(ip_classification_rdd)
#Join stream with static dataset on field log_name
joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_rdd[log_name]))
我收到此异常:“您似乎正在尝试广播 RDD 或从“
斯卡拉
但是,这里有人有同样的要求:How to join a DStream with a non-stream file?
这就是解决方案:
val vdpJoinedGeo = goodIPsFltrBI.flatMap{ip => geoDataBC.value.get(ip).map(data=> (ip,data)}
Pyspark 中的等价物是什么?