0

我在 AWS EMR 上运行的 spark 应用程序从存储在 S3 中的 JSON 数组加载数据。然后通过 Spark 引擎处理从中创建的 Dataframe。

我的源 JSON 数据采用多个 S3 对象的形式。我需要将它们压缩成一个 JSON 数组,以减少从我的 Spark 应用程序中读取的 S3 对象的数量。我尝试使用“s3-dist-cp --groupBy”,但结果是连接的 JSON 数据,它本身不是有效的 JSON 文件,所以我无法从中创建数据框。

这里有一个简化的例子来进一步说明它。

源数据:

S3 对象 Record1.json :{“名称”:“约翰”,“城市”:“伦敦”}

S3 对象 Record2.json :{“名称”:“玛丽”,“城市”:“巴黎”}

s3-dist-cp --src s3://source/ --dest s3://dest/ --groupBy='.*Record.*(\w+)'

汇总输出

{“姓名”:“玛丽”,“城市”:“巴黎”}{“姓名”:“约翰”,“城市”:“伦敦”}

我需要的 :

[{“姓名”:“约翰”,“城市”:“伦敦”},{“姓名”:“玛丽”,“城市”:“巴黎”}]

申请代码供参考

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
val schema = new StructType()
                 .add("Name",StringType,true)
                 .add("City",StringType,true)

val df = spark.read.option("multiline","true").schema(schema).json("test.json")
df.show()

预期产出

+----+------+

|姓名| 城市|

+----+------+

|约翰|伦敦|

|玛丽| 巴黎|

+----+------+

s3-dist-cp 是我需要的正确工具吗?对于将由 Spark 应用程序加载为 Dataframe 的 json 数据聚合的任何其他建议?

4

1 回答 1

0

或者,您可以使用regexp_replace将单行字符串替换为 json 格式的多行字符串,然后再将其转换为数据集。

检查样品

val df = spark.read.text("test.json")\
    .withColumn("json", from_json(regexp_replace(col("value"), "\}\{", "\}\n\{"), schema))\
        .select("json.*")

df.show()

关于 regexp_replacePyspark 替换 Spark 数据框列中的字符串

于 2020-04-07T21:48:20.447 回答