0

我正在使用 DataBricks Connect 在远程集群上运行 PySpark 应用程序。当另一列具有特定值时,我在尝试检索列的最小值时遇到问题。运行以下行时:

feat_min = df.filter(df['target'] == 1).select(
            F.min(F.col('feat')).alias('temp')).first().temp

我收到此错误:

Exception has occurred: Py4JJavaError
An error occurred while calling o5043.collectToPython.
: java.lang.StackOverflowError
    at scala.collection.TraversableLike.builder$1(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map(TraversableLike.scala:237)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)

Java 堆栈跟踪很长,但根本没有信息。同样,Python 堆栈跟踪仅指向它失败的行并且不提供任何有用的信息。

数据框非常小,1000 行或更少。当直接在同一个集群上运行代码时,问题不会发生。当在安装了 PySpark 的不同 conda 环境中本地运行它时,它也不会发生。

我看到了这个问题maxResultSize并按照建议进行了更改。我尝试了 10g 和 0(无限)都无济于事。

我认为这应该与我本地计算机上的 Spark 配置有关,但除了maxResultSize我没有更改 Databricks Connect 安装的默认设置之外的任何内容。顺便说一句,按照说明,DB Connect 安装在没有 PySpark 的单独 conda 环境中。我的本地机器和集群上都运行了 Python 3.8.10,并且为我的 DBR 安装了正确的 DB Connect 版本。

如果有任何帮助,这是我的 Spark 配置:

('spark.app.startTime', '1637931933606')
('spark.sql.catalogImplementation', 'in-memory')
('spark.driver.host', '192.168.0.36')
('spark.app.name', 'project')
('spark.executor.id', 'driver')
('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
('spark.sql.warehouse.dir', 'file:/home/user/project/spark-warehouse')
('spark.rdd.compress', 'True')
('spark.app.id', 'local-1637931934443')
('spark.serializer.objectStreamReset', '100')
('spark.driver.maxResultSize', '0')
('spark.master', 'local[*]')
('spark.submit.pyFiles', '')
('spark.submit.deployMode', 'client')
('spark.ui.showConsoleProgress', 'true')
('spark.driver.port', '45897')
('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')

提前感谢任何输入,我对 Spark 还是很陌生,让 DB Connect 正常工作将是天赐之物。

4

1 回答 1

0

请先尝试聚合,请测试以下代码:

feat_min = df.filter(df['target'] == 1).agg(F.min(F.col('feat'))).first()[0]
于 2021-11-26T13:59:41.040 回答