34

我正在将 CSV 作为 Spark DataFrame 读取并对其执行机器学习操作。我不断收到 Python 序列化 EOFError - 知道为什么吗?我认为这可能是一个内存问题 - 即文件超出了可用 RAM - 但大幅减少 DataFrame 的大小并不能防止 EOF 错误。

玩具代码和错误如下。

#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)

#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()

在单个节点上重复运行上述代码spark-submit会引发以下错误,即使在拟合模型之前 DataFrame 的大小已减小(例如tinydf = df.sample(False, 0.00001)

Traceback (most recent call last):
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 157, 
     in manager
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 61, 
     in worker
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/worker.py", line 136, 
     in main if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/serializers.py", line 545, 
     in read_int
    raise EOFError
  EOFError
4

3 回答 3

3

该错误似乎发生在 pySpark read_int 函数中。spark站点的代码如下:

def read_int(stream):
length = stream.read(4)
if not length:
    raise EOFError
return struct.unpack("!i", length)[0]

这意味着当从流中读取 4 个字节时,如果读取了 0 个字节,则会引发 EOF 错误。python 文档在这里

于 2017-08-18T12:59:23.203 回答
0

您是否检查过代码中出现 EOError 的位置?

我的猜测是它会在您尝试定义 df 时出现,因为这是您的代码中文件实际尝试读取的唯一位置。

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

在这一行之后的每一点,您的代码都在使用变量df,而不是文件本身,因此这行似乎很可能正在生成错误。

测试是否是这种情况的一种简单方法是注释掉其余代码,和/或在上面的行之后放置这样的行。

print(len(df))

另一种方法是使用try循环,例如:

try:
    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')
except:
    print("Failed to load file into df!")

如果事实证明该行是产生 EOFError 的行,那么您一开始就永远不会获得数据帧,因此尝试减少它们不会有任何区别。

如果那是产生错误的行,那么会想到两种可能性:

  1. 您的代码之前调用了一个或两个 .csv 文件,并且没有在此行之前关闭它。如果是这样,只需在此处的代码上方将其关闭。

  2. .csv 文件本身有问题。尝试在这段代码之外加载它们,看看你是否可以首先使用 csv.reader 之类的东西将它们正确地放入内存,并以你期望的方式操作它们。

于 2018-09-20T11:13:17.647 回答
0

我遇到了同样的问题,不知道如何调试它。似乎它会导致执行器线程卡住并且永远不会返回任何东西。

于 2020-07-14T14:07:34.757 回答