3

我正在尝试在 Dstream 和静态 RDD 之间执行连接。

PySpark

  #Create static data
    ip_classification_rdd = sc.parallelize([('log_name','enrichment_success')])
    #Broadcast it to all nodes
    ip_classification_rdd_broadcast = sc.broadcast(ip_classification_rdd)
    #Join stream with static dataset on field log_name      
    joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_rdd[log_name]))

我收到此异常:“您似乎正在尝试广播 RDD 或从“

斯卡拉

但是,这里有人有同样的要求:How to join a DStream with a non-stream file?

这就是解决方案:

val vdpJoinedGeo = goodIPsFltrBI.flatMap{ip => geoDataBC.value.get(ip).map(data=> (ip,data)}

Pyspark 中的等价物是什么?

4

1 回答 1

0

您的代码需要进行一些更改:

  • 您不能广播RDD: 而是在底层“数据”上进行广播:
  • 然后,您使用该value()方法获取闭包内的广播变量

这是您更新后的代码的近似值:

 #Create static data
    data = [('log_name','enrichment_success')])
    #Broadcast it to all nodes
    ip_classification_broadcast = sc.broadcast(data)
    #Join stream with static dataset on field log_name      
    joinedStream = kafkaStream.transform(lambda rdd:  \
        rdd.join(ip_classification_broadcast.value().get[1]))
于 2018-08-17T12:18:16.697 回答