当我用
dataframe.coalesce(1).write.format('json')
在 pyspark 上,我无法更改分区中的文件名
我这样写我的JSON:
dataframe.coalesce(1).write.format('json').mode('overwrite').save('path')
但我无法更改分区中的文件名
我想要这样的路径:
/文件夹/my_name.json
其中“my_name.json”是一个 json 文件
当我用
dataframe.coalesce(1).write.format('json')
在 pyspark 上,我无法更改分区中的文件名
我这样写我的JSON:
dataframe.coalesce(1).write.format('json').mode('overwrite').save('path')
但我无法更改分区中的文件名
我想要这样的路径:
/文件夹/my_name.json
其中“my_name.json”是一个 json 文件
在 sparkwe can't control name of the file中写入目录。
首先将数据写入,HDFS directory然后为更改我们需要使用的文件名HDFS api。
Example:
In Pyspark:
l=[("a",1)]
ll=["id","sa"]
df=spark.createDataFrame(l,ll)
hdfs_dir = "/folder/" #your hdfs directory
new_filename="my_name.json" #new filename
df.coalesce(1).write.format("json").mode("overwrite").save(hdfs_dir)
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
#list files in the directory
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dir))
#filter name of the file starts with part-
file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]
#rename the file
fs.rename(Path(hdfs_dir+''+file_name),Path(hdfs_dir+''+new_filename))
如果要success files在目录中删除,请使用fs.delete删除_Success文件。
In Scala:
val df=Seq(("a",1)).toDF("id","sa")
df.show(false)
import org.apache.hadoop.fs._
val hdfs_dir = "/folder/"
val new_filename="new_json.json"
df.coalesce(1).write.mode("overwrite").format("json").save(hdfs_dir)
val fs=FileSystem.get(sc.hadoopConfiguration)
val f=fs.globStatus(new Path(s"${hdfs_dir}" + "*")).filter(x => x.getPath.getName.toString.startsWith("part-")).map(x => x.getPath.getName).mkString
fs.rename(new Path(s"${hdfs_dir}${f}"),new Path(s"${hdfs_dir}${new_filename}"))
fs.delete(new Path(s"${hdfs_dir}" + "_SUCCESS"))
扩展接受的答案。对于使用AWS S3的人
以下为我工作,
# Save the file to S3 bucket
spark_df.repartition(1).write.mode('append').parquet("s3://bucket_name/folder_name")
myPath = "s3://bucket_name/folder_name/*"
hadoopPath = SparkContext._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(SparkContext._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)
file_name = [file.getPath().getName() for file in statuses if file.getPath().getName().startswith('part-')][0]
hadoopFs.rename(SparkContext._jvm.org.apache.hadoop.fs.Path(f"s3://bucket_name/folder_name/{file_name}"), SparkContext._jvm.org.apache.hadoop.fs.Path("s3://bucket_name/folder_name/myFile.parquet"))