我正在尝试生成存储桶/文件夹中所有 S3 文件的列表。文件夹中通常有数百万个文件。我现在使用 boto,它每分钟可以检索大约 33k 个文件,即使是一百万个文件,也需要半小时。我还将这些文件加载到数据框中,但生成并使用此列表作为跟踪正在处理的文件的一种方式。
我注意到的是,当我要求 Spark 读取文件夹中的所有文件时,它会列出自己的列表,并且能够比 boto 调用更快地列出它们,然后处理这些文件。我在 PySpark 中查找了一种方法,但没有找到好的示例。我得到的最接近的是一些 Java 和 Scala 代码来列出使用 HDFS 库的文件。
有没有办法在 Python 和 Spark 中做到这一点?作为参考,我正在尝试复制以下代码片段:
def get_s3_files(source_directory, file_type="json"):
s3_resource = boto3.resource("s3")
file_prepend_path = f"/{'/'.join(source_directory.parts[1:4])}"
bucket_name = str(source_directory.parts[3])
prefix = "/".join(source_directory.parts[4:])
bucket = s3_resource.Bucket(bucket_name)
s3_source_files = []
for object in bucket.objects.filter(Prefix=prefix):
if object.key.endswith(f".{file_type}"):
s3_source_files.append(
(
f"{file_prepend_path}/{object.key}",
object.size,
str(source_directory),
str(datetime.now()),
)
)
return s3_source_files