在 Spark 中,我们遇到了同样的问题。我们正在使用以下内容:
from pyspark.sql.functions import *
@udf
def concatenated_json_to_array(text):
final = "["
separator = ""
for part in text.split("}{"):
final += separator + part
separator = "}{" if re.search(r':\s*"([^"]|(\\"))*$', final) else "},{"
return final + "]"
def read_concatenated_json(path, schema):
return (spark.read
.option("lineSep", None)
.text(path)
.withColumn("value", concatenated_json_to_array("value"))
.withColumn("value", from_json("value", schema))
.withColumn("value", explode("value"))
.select("value.*"))
它的工作原理如下:
- 将数据读取为每个文件一个字符串(无分隔符!)
- 使用 UDF 引入 JSON 数组并通过引入逗号来拆分 JSON 对象。注意:小心不要破坏
}{
其中的任何字符串!
- 将带有模式的 JSON 解析为 DataFrame 字段。
- 将数组分解为单独的行
- 将值对象展开为列。
像这样使用它:
from pyspark.sql.types import *
schema = ArrayType(
StructType([
StructField("type", StringType(), True),
StructField("value", StructType([
StructField("id", IntegerType(), True),
StructField("joke", StringType(), True),
StructField("categories", ArrayType(StringType()), True)
]), True)
])
)
path = '/mnt/my_bucket_name/messages/*/*/*/*/'
df = read_concatenated_json(path, schema)
我在这里写了更多细节和注意事项:Parsing JSON data from S3 (Kinesis) with Spark。不要只用 分割}{
,因为它会弄乱你的字符串数据!例如:{ "line": "a\"r}{t" }
。