我无法通过 dockerized pyspark 容器连接到雪花。目前,我发现雪花文档和 pyspark 文档都没有帮助。
我正在使用以下配置安装&可以在下面的 Dockerfile 中看到
- 蟒蛇 3.7.12
- pyspark 3.1.1
- Hadoop 3.2
- jre-1.8.0-openjdk
- 雪花-jdbc-3.13.15.jar
- spark-snowflake_2.12-2.10.0-spark_3.1.jar
- 雪花连接器-python 2.7.4
make run
..starting docker container..
docker run --interactive --tty \
--volume /src:/src \
--volume /data/:/root/data \
--volume /jars:/jars \
reports bash '-c' "cp -r /jars /opt/spark-3.1.1-bin-hadoop3.2/jars && cd /home && export PYTHONIOENCODING=utf8 && spark-submit \
/src/reports.py \
--jars net.snowflake:/jars/snowflake-jdbc-3.13.14.jar,net.snowflake:/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar \
--partitions-output "4" \
1> >(sed $'s,.*,\e[32m&\e[m,' >&2)" || true
22/03/01 18:58:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/03/01 18:58:48 INFO SparkContext: Running Spark version 3.1.1
22/03/01 18:58:48 INFO ResourceUtils: ==============================================================
22/03/01 18:58:48 INFO ResourceUtils: No custom resources configured for spark.driver.
22/03/01 18:58:48 INFO ResourceUtils: ==============================================================
22/03/01 18:58:48 INFO SparkContext: Submitted application: reports.py
22/03/01 18:58:48 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
22/03/01 18:58:48 INFO ResourceProfile: Limiting resource is cpu
22/03/01 18:58:48 INFO ResourceProfileManager: Added ResourceProfile id: 0
22/03/01 18:58:48 INFO SecurityManager: Changing view acls to: root
22/03/01 18:58:48 INFO SecurityManager: Changing modify acls to: root
22/03/01 18:58:48 INFO SecurityManager: Changing view acls groups to:
22/03/01 18:58:48 INFO SecurityManager: Changing modify acls groups to:
22/03/01 18:58:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
22/03/01 18:58:48 INFO Utils: Successfully started service 'sparkDriver' on port 43005.
22/03/01 18:58:48 INFO SparkEnv: Registering MapOutputTracker
22/03/01 18:58:48 INFO SparkEnv: Registering BlockManagerMaster
22/03/01 18:58:48 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/03/01 18:58:48 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/03/01 18:58:48 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
22/03/01 18:58:48 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-08fa4812-277a-4fcd-9a3a-8fa20910960d
22/03/01 18:58:48 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
22/03/01 18:58:48 INFO SparkEnv: Registering OutputCommitCoordinator
22/03/01 18:58:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.
22/03/01 18:58:48 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://dcc55fed507c:4040
22/03/01 18:58:48 INFO SparkContext: Added JAR /jars/snowflake-jdbc-3.13.15.jar at spark://dcc55fed507c:43005/jars/snowflake-jdbc-3.13.15.jar with timestamp 1646161128297
22/03/01 18:58:48 INFO SparkContext: Added JAR /jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar at spark://dcc55fed507c:43005/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar with timestamp 1646161128297
22/03/01 18:58:48 INFO Executor: Starting executor ID driver on host dcc55fed507c
22/03/01 18:58:48 INFO Executor: Fetching spark://dcc55fed507c:43005/jars/snowflake-jdbc-3.13.15.jar with timestamp 1646161128297
22/03/01 18:58:48 INFO TransportClientFactory: Successfully created connection to dcc55fed507c/172.17.0.2:43005 after 19 ms (0 ms spent in bootstraps)
22/03/01 18:58:48 INFO Utils: Fetching spark://dcc55fed507c:43005/jars/snowflake-jdbc-3.13.15.jar to /tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/userFiles-41e82fa6-114a-4edf-99dc-c460ca487ae9/fetchFileTemp6096133446952027318.tmp
22/03/01 18:58:49 INFO Executor: Adding file:/tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/userFiles-41e82fa6-114a-4edf-99dc-c460ca487ae9/snowflake-jdbc-3.13.15.jar to class loader
22/03/01 18:58:49 INFO Executor: Fetching spark://dcc55fed507c:43005/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar with timestamp 1646161128297
22/03/01 18:58:49 INFO Utils: Fetching spark://dcc55fed507c:43005/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar to /tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/userFiles-41e82fa6-114a-4edf-99dc-c460ca487ae9/fetchFileTemp5234593641216600025.tmp
22/03/01 18:58:49 INFO Executor: Adding file:/tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/userFiles-41e82fa6-114a-4edf-99dc-c460ca487ae9/spark-snowflake_2.12-2.10.0-spark_3.1.jar to class loader
22/03/01 18:58:49 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37765.
22/03/01 18:58:49 INFO NettyBlockTransferService: Server created on dcc55fed507c:37765
22/03/01 18:58:49 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/03/01 18:58:49 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, dcc55fed507c, 37765, None)
22/03/01 18:58:49 INFO BlockManagerMasterEndpoint: Registering block manager dcc55fed507c:37765 with 366.3 MiB RAM, BlockManagerId(driver, dcc55fed507c, 37765, None)
22/03/01 18:58:49 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, dcc55fed507c, 37765, None)
22/03/01 18:58:49 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, dcc55fed507c, 37765, None)
22/03/01 18:58:49 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/spark-warehouse').
Traceback (most recent call last):
File "/src/reports.py", line 4, in <module>
class Reports:
File "/src/reports.py", line 37, in Reports
.option("query", "select * from Users limit 10")
File "/opt/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load
File "/opt/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/opt/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/opt/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.
: java.lang.ClassNotFoundException: Failed to find data source: snowflake. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:689)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:743)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:266)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: snowflake.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:663)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:663)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:663)
... 14 more
22/03/01 18:58:50 INFO SparkContext: Invoking stop() from shutdown hook
22/03/01 18:58:50 INFO SparkUI: Stopped Spark web UI at http://dcc55fed507c:4040
22/03/01 18:58:50 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/03/01 18:58:50 INFO MemoryStore: MemoryStore cleared
22/03/01 18:58:50 INFO BlockManager: BlockManager stopped
22/03/01 18:58:50 INFO BlockManagerMaster: BlockManagerMaster stopped
22/03/01 18:58:50 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/03/01 18:58:50 INFO SparkContext: Successfully stopped SparkContext
22/03/01 18:58:50 INFO ShutdownHookManager: Shutdown hook called
22/03/01 18:58:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2
22/03/01 18:58:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/pyspark-8b92b10f-c141-46bc-bdc4-55a98d34e594
22/03/01 18:58:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-170d1583-baf4-4b61-ba62-c016c7948006
Dockerfile
ARG SPARK_VERSION=3.1.1
ARG HADOOP_VERSION=3.2
# Official Amazon Linux Image
FROM library/amazonlinux:latest AS builder
ARG SPARK_VERSION
ARG HADOOP_VERSION
# Change the Default Shell for 'builder'
SHELL ["/bin/bash", "-c"]
# Install Package Dependencies for Installations
RUN yum update -y && yum upgrade -y
RUN yum install -y diffutils gzip tar wget
# Install Spark
WORKDIR /opt
RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz.sha512
RUN gpg --print-md sha512 spark-*-bin-hadoop*.tgz | diff - spark-*-bin-hadoop*.tgz.sha512
RUN tar fxz spark-*-bin-hadoop*.tgz
RUN rm spark-*-bin-hadoop*.tgz*
# Copy Build Artifact
FROM library/amazonlinux:latest
COPY --from=builder /opt /opt
ARG SPARK_VERSION
ARG HADOOP_VERSION
COPY yum-requirements.txt .
RUN yum -y install $(cat yum-requirements.txt)
RUN pip3 install --upgrade pip
COPY python-requirements.txt .
RUN sed -i 's/{SPARK_VERSION}/${SPARK_VERSION}/g' python-requirements.txt
RUN pip3 install -r python-requirements.txt
COPY snowflake-requirements.txt .
RUN pip3 install -r snowflake-requirements.txt
# Configure Environment Variables
ENV SPARK_HOME=/opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} \
JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk \
PATH=/opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/bin:/usr/lib/jvm/jre-1.8.0-openjdk/bin:${PATH} \
PYSPARK_PYTHON=/usr/bin/python3
WORKDIR /home
脚本
from pyspark.sql import SparkSession
import configparser
class Reports:
spark = SparkSession.builder \
.config('spark.jars', '/jars/snowflake-jdbc-3.13.15.jar,/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar') \
.getOrCreate()
config = configparser.ConfigParser()
config.read("/src/config.ini")
# Snowflake connection parameters
sfparams = {
"sfURL": f'{config["snowflake"]["account"]}.{config["snowflake"]["region"]}.snowflakecomputing.com',
"sfAccount": config["snowflake"]["account"],
"sfUser": config["snowflake"]["user"],
"sfPassword": config["snowflake"]["password"],
"sfDatabase": config["snowflake"]["database"],
"sfSchema": config["snowflake"]["schema"],
"sfWarehouse": config["snowflake"]["warehouse"],
}
SNOWFLAKE_SOURCE_NAME = "snowflake"
df = (
spark.read.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfparams)
.option("query", "select * from Users limit 10")
.load()
)
df.show()
if __name__ == "__main__":
Reports()