1

我使用 spark NLP 解析了 500k 条推文作为测试。数据框看起来不错。我将数组转换为字符串。使用

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'

array_to_string_udf = udf(array_to_string, StringType())

result = result.withColumn('token', array_to_string_udf(result["token"])).withColumn('ner', array_to_string_udf(result["ner"])).withColumn('embeddings', array_to_string_udf(result["embeddings"])).withColumn('ner_chunk', array_to_string_udf(result["ner_chunk"])).withColumn('document', array_to_string_udf(result["document"]))

数据框看起来不错。但是,每当我尝试将其转换为 pandas 时,将其导出为 csv 我都会收到以下错误

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 584, in main
  File "C:\spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 562, in read_int
    length = stream.read(4)
  File "C:\ProgramData\Anaconda3\lib\socket.py", line 669, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

这让我觉得 spark 不是在和 python 说话。有谁知道问题可能是什么?

4

1 回答 1

0

调用toPandas时

所有数据都加载到驱动程序的内存中

调用的效果与调用collecttoPandas的效果基本相同。

将数据帧的内容写入 csv 的更好方法是直接使用 PySpark 的DataFrameWriter

result.write.csv('my_result.csv')

编辑:可能与问题没有直接关系,但可以用原生 Spark 函数(litconcatconcat_ws)替换 udf :

from pyspark.sql import functions as F

result.withColumn("token", F.concat(F.lit("["), F.concat_ws(",", "token"),F.lit("]")))....

用 Spark 函数替换 udf 将提高性能。

于 2021-08-01T20:31:26.687 回答