我有一个大数据框(~30M 行)。我有一个功能f
。的业务f
是遍历每一行,检查一些逻辑并将输出输入字典。该功能需要逐行执行。
我试过了:
dic = dict() for row in df.rdd.collect(): f(row, dic)
但我总是遇到错误OOM。我将 Docker 的内存设置为 8GB。
我怎样才能有效地开展业务?
我有一个大数据框(~30M 行)。我有一个功能f
。的业务f
是遍历每一行,检查一些逻辑并将输出输入字典。该功能需要逐行执行。
我试过了:
dic = dict() for row in df.rdd.collect(): f(row, dic)
但我总是遇到错误OOM。我将 Docker 的内存设置为 8GB。
我怎样才能有效地开展业务?
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, MapType
#sample data
df = sc.parallelize([
['a', 'b'],
['c', 'd'],
['e', 'f']
]).toDF(('col1', 'col2'))
#add logic to create dictionary element using rows of the dataframe
def add_to_dict(l):
d = {}
d[l[0]] = l[1]
return d
add_to_dict_udf = udf(add_to_dict, MapType(StringType(), StringType()))
#struct is used to pass rows of dataframe
df = df.withColumn("dictionary_item", add_to_dict_udf(struct([df[x] for x in df.columns])))
df.show()
#list of dictionary elements
dictionary_list = [i[0] for i in df.select('dictionary_item').collect()]
print dictionary_list
输出是:
[{u'a': u'b'}, {u'c': u'd'}, {u'e': u'f'}]
通过使用collect
,您将所有数据从 Spark 执行器中提取到您的驱动程序中。你真的应该避免这种情况,因为它使使用 Spark 毫无意义(在这种情况下你可以只使用普通的 python)。
你能做什么:
使用已经可用的函数重新实现您的逻辑:pyspark.sql.functions doc
如果你不能做第一个,因为缺少功能,你可以定义一个用户定义的函数