我正在执行一项工作pyspark
,我曾一度使用分组聚合 Pandas UDF。这会导致以下(此处为缩写)错误:
org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
我相当确定这是因为 pandas UDF 接收的组之一是巨大的,如果我减少数据集并删除足够的行,我可以毫无问题地运行我的 UDF。但是,我想使用我的原始数据集运行,即使我在具有 192.0 GiB RAM 的机器上运行这个 spark 作业,我仍然会遇到同样的错误。(并且 192.0 GiB 应该足以将整个数据集保存在内存中。)
如何为 spark 提供足够的内存来运行需要大量内存的分组聚合 Pandas UDF?
例如,是否有一些我错过的火花配置可以为 apache 箭头提供更多内存?
更长的错误信息
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in
----> 1 device_attack_result.count()
2
3
4
/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
520 2
521 """
--> 522 return int(self._jdf.count())
523
524 @ignore_unicode_prefix
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...