我是databricks和pyspark的初学者。目前,我有一个包含 3 列的 pyspark 数据框:
- 日期
- 数量
- 货币
我想将金额列转换为欧元并用当天的汇率计算。为此,我使用汇率 API 通过将日期和货币作为参数来查找汇率。
首先,我定义了一个调用 API 来查找汇率的函数
这是我的代码:
def API(val1,currency,date):
r = requests.get('https://api.exchangeratesapi.io/'+date,params={'symbols':currency})
df = spark.read.json(sc.parallelize([r.json()]))
df_value = df.select(F.col("rates."+currency))
value = df_value.collect()[0][2]
val = val1*(1/value)
return float(val)
然后,我定义了一个 UDF 来在我的数据框中调用这个函数:
API_Convert = F.udf(lambda x,y,z : API(x,y,z) if (y!= 'EUR') else x, FloatType())
当我尝试执行这部分时,我得到了我绝对不明白的酸洗错误......
df = df.withColumn('amount',API_Convert('amount','currency','date'))
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
你能帮我解决这个问题吗?
非常感谢