1

我有一个事件案例类的数据集,我想将其中的 json 字符串元素保存到 s3 上的文件中,其路径类似于 bucketName/service/yyyy/mm/dd/hh/[SomeGuid].gz

例如,事件案例类如下所示:

case class Event(
  hourPath: String,  // e.g. bucketName/service/yyyy/mm/dd/hh/ 
  json: String  // The json line that represents this particular event.
  ... // Other properties used in earlier transformations.
)

有没有办法保存我们将属于特定时间的事件写入 s3 上的文件的数据集?

在 DataframeWriter 上调用 partitionBy 是我能得到的最接近的方法,但文件路径并不是我想要的。

4

1 回答 1

1

您可以迭代每个项目并将其写入 S3 中的文件。使用 Spark 执行此操作很有效,因为它将并行执行。

这段代码对我有用:

val tempDS = eventsDS.rdd.collect.map(x => saveJSONtoS3(x.hourPath,x.json))

def saveJSONtoS3(path: String, jsonString: String) : Unit = {
    val bucketName = path.substring(0,path.indexOf('/'));
    val file = path.substring(bucketName.length()+1);

    val creds = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY)
    val amazonS3Client = new AmazonS3Client(creds)
    val meta = new ObjectMetadata();
    amazonS3Client.putObject(bucketName, file, new ByteArrayInputStream(jsonString.getBytes), meta)
 }

您需要导入:

import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.s3.model.ObjectMetadata

您需要包含aws-java-sdk库。

于 2019-07-25T07:20:01.473 回答