我按月从 RDBMS 中卸载了一堆数据,并根据该月将其加载到 Google Cloud Storage (GCS) 中。然后,我将整个数据集读入 dataproc 集群上的 pyspark 数据帧中,并希望根据天而不是月份将其重新写入 GCS。我已成功写入云存储,其中每个文件仅包含某个日期,但未能有效地根据该日期命名文件或目录。下面的代码做了我想做的事,但效率很低。我也知道理论上我可以通过使用镶木地板文件来解决这个问题,但我的要求是写成 CSV。最终,我想每天将这些数据加载到 bigquery 中,如果有一个更容易的解决方案(然后我可以将每个每天的表导出到文件中)。
# Find distinct dates, and then write based on that.
dates = sqlContext.sql("SELECT distinct THE_DATE FROM tbl")
x = dates.collect()
for d in x:
date = d.SLTRN_DT
single_wk = sqlContext.sql("SELECT * FROM tbl where THE_DATE = '{}'".format(date))
towrite = single_wk.map(to_csv)
towrite.coalesce(4).saveAsTextFile('gs://buck_1/AUDIT/{}'.format(date))
所以说我读入的数据有日期 ['2014-01-01', '2014-01-02', '2014-01-03'] 我希望生成的文件/目录看起来像这样:
gs://buck_1/AUDIT/2014-01-01/part-1
gs://buck_1/AUDIT/2014-01-01/part-2
gs://buck_1/AUDIT/2014-01-01/part- 3
gs://buck_1/AUDIT/2014-01-01/part-4
gs://buck_1/AUDIT/2014-01-02/part-1
gs://buck_1/AUDIT/2014-01-02/part-2
gs://buck_1/AUDIT/2014-01-02/part- 3
gs://buck_1/AUDIT/2014-01-02/part-4
gs://buck_1/AUDIT/2014-01-03/part-1
gs://buck_1/AUDIT/2014-01-03/part-2
gs://buck_1/AUDIT/2014-01-03/part- 3
gs://buck_1/AUDIT/2014-01-03/part-4