4

I have a large dataset stored in a S3 bucket, but instead of being a single large file, it's composed of many (113K to be exact) individual JSON files, each of which contains 100-1000 observations. These observations aren't on the highest level, but require some navigation within each JSON to access. i.e. json["interactions"] is a list of dictionaries.

I'm trying to utilize Spark/PySpark (version 1.1.1) to parse through and reduce this data, but I can't figure out the right way to load it into an RDD, because it's neither all records > one file (in which case I'd use sc.textFile, though added complication here of JSON) nor each record > one file (in which case I'd use sc.wholeTextFiles).

Is my best option to use sc.wholeTextFiles and then use a map (or in this case flatMap?) to pull the multiple observations from being stored under a single filename key to their own key? Or is there an easier way to do this that I'm missing?

I've seen answers here that suggest just using json.loads() on all files loaded via sc.textFile, but it doesn't seem like that would work for me because the JSONs aren't simple highest-level lists.

4

3 回答 3

10

前面的答案不会以分布式方式读取文件(请参阅参考资料)。为此,您需要并行化 s3 键,然后在下面的 flatMap 步骤中读取文件。

import boto3
import json
from pyspark.sql import Row

def distributedJsonRead(s3Key):
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=s3Key)
    contents = json.loads(s3obj.get()['Body'].read().decode('utf-8'))
    for dicts in content['interactions']
        yield Row(**dicts)

pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys
dataRdd = pkeys.flatMap(distributedJsonRead)

Boto3 参考

于 2016-04-07T18:00:54.453 回答
6

使用 DataFrame 怎么样?

确实 testFrame = sqlContext.read.json('s3n://<bucket>/<key>') 从一个文件中为您提供了您想要的东西?

每个观察是否都有相同的“列”(键数)?

如果是这样,您可以使用 boto 列出要添加的每个对象,将它们读入并将它们相互结合。

from pyspark.sql import SQLContext
import boto3
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

s3 = boto3.resource('s3')
bucket = s3.Bucket('<bucket>')

aws_secret_access_key = '<secret>'
aws_access_key_id = '<key>'

#Configure spark with your S3 access keys
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_secret_access_key)
object_list = [k for k in bucket.objects.all() ]
key_list = [k.key for k in bucket.objects.all()]

paths = ['s3n://'+o.bucket_name+'/'+ o.key for o in object_list ]

dataframes = [sqlContext.read.json(path) for path in paths]

df = dataframes[0]
for idx, frame in enumerate(dataframes):
    df = df.unionAll(frame)

我是新手,所以我想知道是否有更好的方法来使用包含大量 s3 文件的数据帧,但到目前为止,这对我有用。

于 2015-11-20T21:56:34.940 回答
3

该名称具有误导性(因为它是单数),但sparkContext.textFile()(至少在 Scala 的情况下)也接受目录名称或通配符路径,因此您只能说textFile("/my/dir/*.json").

于 2015-02-24T00:59:43.173 回答