0

我是databricks和pyspark的初学者。目前,我有一个包含 3 列的 pyspark 数据框:

  • 日期
  • 数量
  • 货币

我想将金额列转换为欧元并用当天的汇率计算。为此,我使用汇率 API 通过将日期和货币作为参数来查找汇率。

首先,我定义了一个调用 API 来查找汇率的函数

这是我的代码:

def API(val1,currency,date):
  r = requests.get('https://api.exchangeratesapi.io/'+date,params={'symbols':currency})
  df = spark.read.json(sc.parallelize([r.json()]))
  df_value = df.select(F.col("rates."+currency))
  value = df_value.collect()[0][2]
  val = val1*(1/value)

  return float(val) 

然后,我定义了一个 UDF 来在我的数据框中调用这个函数:

API_Convert = F.udf(lambda x,y,z : API(x,y,z) if (y!= 'EUR') else x, FloatType())

当我尝试执行这部分时,我得到了我绝对不明白的酸洗错误......

df = df.withColumn('amount',API_Convert('amount','currency','date'))
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

你能帮我解决这个问题吗?

非常感谢

4

0 回答 0