以下代码试图从一个输入 S3 路径中获取一堆文件,然后将它们写入单个 S3 文件夹,文件夹名称作为输入数据中的日期列。
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from datetime import date, datetime
from pyspark.sql.functions import *
from time import sleep
from pyspark.sql import SQLContext
import os
import math
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", help="input s3 path")
parser.add_argument("--output", help="output s3 path")
args = parser.parse_args()
if args.input:
input = args.input
if args.output:
output = args.output
sc = SparkContext("local", "Simple App")
sqlContext = SQLContext(sc)
m1 = input
mrdd1 = sc.textFile(m1,use_unicode=False).map(lambda x: x.split('\t'))
fileSchema = {"fields":
[
{"metadata":{},"name":"col1","nullable": True,"type":"string","max_length":10},
{"metadata":{},"name":"col2","nullable": True,"type":"string","max_length":100},
{"metadata":{},"name":"col3","type":"string","nullable": True,"max_length":100},
{"metadata":{},"name":"col4","nullable": True,"type":"string","max_length":100,"date_mask":"yyyy-mm-dd"},
{"metadata":{},"name":"col5","nullable": True,"type":"string","max_length":100},
{"metadata":{},"name":"col6","nullable": True,"type":"string","max_length":100},
{"metadata":{},"name":"col7","nullable": True,"type":"string","max_length":100},
{"metadata":{},"name":"col8","nullable": True,"type":"string","max_length":100},
{"metadata":{},"name":"col9","nullable": True,"type":"string","max_length":100}
]}
def convertToDf( rdd, fileSchema ):
schema=StructType.fromJson(fileSchema)
df = sqlContext.createDataFrame(rdd, schema)
return df
monthlyrddtable1 = convertToDf(mrdd1, fileSchema)
newDf = monthlyrddtable1.withColumn("filename",concat(regexp_replace(col("col4"), "-", ""), lit(".txt.gz")))
newDf.write.save(path=output, mode="overwrite", format='csv', compression='gzip', sep='\t', partitionBy="filename")
输入文件为每个部分文件 15MB,文件总数约为 8000 个。输出预计将写入 900 个文件夹,因为存在 900 天。但在 S3 写入失败后运行 1 1/2 小时后,这些作业会超时或出错。我认为这个问题是由于编写了太多的小部分 S3 文件。我是新手,有什么方法可以设置正确的参数以加快速度并避免过多的 S3 文件?
17/04/26 21:23:24 ERROR CSEMultipartUploadOutputStream: failed to upload object xxxx/xxxxx/xxxxxxxx/xxxxxxx/xxxxxx/ed=20130502.txt.gz/part-00017-xxxxxxxxxxxxxxxxxxxxx.csv.gz, trying to close piped
input stream.
17/04/26 21:23:24 ERROR Utils: Aborting task
java.io.IOException: Error closing multipart upload
at com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream.close(CSEMultipartUploadOutputStream.java:196)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:111)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVRelation.scala:266)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:388)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: null; Status Code: 0; Error Code:
InternalError; Request ID: xxxxxxxxxxxxxx), S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
at com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream$3.run(CSEMultipartUploadOutputStream.java:155)
... 1 more
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Please try again. (Service: null; Status Code: 0; Error Code: InternalError; Request ID: X
xxxxxxxxxxxxxxxxxxxxxxx), S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$CompleteMultipartUploadHandler.doEndElement(XmlResponsesSaxParser.java:1698)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.transform.AbstractHandler.endElement(AbstractHandler.java:52)
at org.apache.xerces.parsers.AbstractSAXParser.endElement(Unknown Source)
at org.apache.xerces.impl.XMLNSDocumentScannerImpl.scanEndElement(Unknown Source)
at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown Source)
at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)