我正在使用 Pyspark 2.4.4。
我想将 s3 存储桶中的一些镶木地板文件加载到 spark 数据框中,并且我想一次读取所有这些文件。
我一直在寻找如何在这些链接中做到这一点:
- 如何从 S3 读取镶木地板数据以触发 Python 数据框?
- 无法使用 spark 从 s3 存储桶中读取数据
- https://gist.github.com/asmaier/5768c7cda3620901440a62248614bbd0
我尝试了多种方式,但无法加载文件,例如:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import databricks.koalas as ks
import boto3
from boto3.session import Session
import botocore
from zipfile import ZipFile
import urllib
import datetime
import os
from s3fs import S3FileSystem
import dask.dataframe as dd
aws_region = 'ap-southeast-1'
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<myimage>")
sparkConf.set("spark.kubernetes.container.image.pullSecrets", "<secret>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29413")
sparkConf.set("spark.driver.host", "<HOST>")
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("com.amazonaws.services.s3.enableV4", "true")
sparkConf.set("fs.s3a.access.key", "<mykey>")
sparkConf.set("fs.s3a.secret.key", "<mysecret>")
sparkConf.set("fs.s3a.connection.maximum", "100000")
# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
sparkConf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
df = spark.read.parquet(f"s3a://<path>")
我也试过:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import databricks.koalas as ks
import boto3
from boto3.session import Session
import botocore
from zipfile import ZipFile
import urllib
import datetime
import os
from s3fs import S3FileSystem
import dask.dataframe as dd
aws_region = 'ap-southeast-1'
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<myimage>")
sparkConf.set("spark.kubernetes.container.image.pullSecrets", "<secret>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29413")
sparkConf.set("spark.driver.host", "<HOST>")
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf=sc._jsc.hadoopConfiguration()
aws_region = 'ap-southeast-1'
# see https://stackoverflow.com/questions/43454117/how-do-you-use-s3a-with-spark-2-1-0-on-aws-us-east-2
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.access.key", "<KEY>")
hadoop_conf.set("fs.s3a.secret.key", "<SECRET>")
hadoop_conf.set("fs.s3a.connection.maximum", "100000")
# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
hadoop_conf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")
import pyspark
date = datetime.datetime.today() - datetime.timedelta(days=2)
path = '<path>'
sql=pyspark.sql.SparkSession(sc)
sc.parquet("s3a://" + path)
但我有这个错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-14c1e166e21f> in <module>
1 date = datetime.datetime.today() - datetime.timedelta(days=2)
----> 2 df = spark.read.parquet(f"s3a://cp-datadumps/MCF/2020/10/17/advances/advances.parquet_0_0_0.snappy.parquet")
/usr/local/spark/python/pyspark/sql/readwriter.py in parquet(self, *paths)
314 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
315 """
--> 316 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
317
318 @ignore_unicode_prefix
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o209.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:644)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
... 30 more
我知道路径是正确的,因为使用 das 我能够加载数据:`
storage_options = {
"key": "<MYKEY>",
"secret": "<MYSECRET>",
}
s3 = S3FileSystem(**storage_options)
s3.invalidate_cache()
df1 = dd.read_parquet(f"s3://<path>", storage_options=storage_options)