0

我无法通过 dockerized pyspark 容器连接到雪花。目前,我发现雪花文档和 pyspark 文档都没有帮助。

我正在使用以下配置安装&可以在下面的 Dockerfile 中看到

  • 蟒蛇 3.7.12
  • pyspark 3.1.1
  • Hadoop 3.2
  • jre-1.8.0-openjdk
  • 雪花-jdbc-3.13.15.jar
  • spark-snowflake_2.12-2.10.0-spark_3.1.jar
  • 雪花连接器-python 2.7.4
make run
..starting docker container..
docker run --interactive --tty \
                --volume /src:/src \
                --volume /data/:/root/data \
                --volume /jars:/jars \
                reports bash '-c' "cp -r /jars /opt/spark-3.1.1-bin-hadoop3.2/jars && cd /home && export PYTHONIOENCODING=utf8 && spark-submit \
                /src/reports.py \
                --jars net.snowflake:/jars/snowflake-jdbc-3.13.14.jar,net.snowflake:/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar \
                --partitions-output "4" \
                1> >(sed $'s,.*,\e[32m&\e[m,' >&2)" || true
22/03/01 18:58:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/03/01 18:58:48 INFO SparkContext: Running Spark version 3.1.1
22/03/01 18:58:48 INFO ResourceUtils: ==============================================================
22/03/01 18:58:48 INFO ResourceUtils: No custom resources configured for spark.driver.
22/03/01 18:58:48 INFO ResourceUtils: ==============================================================
22/03/01 18:58:48 INFO SparkContext: Submitted application: reports.py
22/03/01 18:58:48 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
22/03/01 18:58:48 INFO ResourceProfile: Limiting resource is cpu
22/03/01 18:58:48 INFO ResourceProfileManager: Added ResourceProfile id: 0
22/03/01 18:58:48 INFO SecurityManager: Changing view acls to: root
22/03/01 18:58:48 INFO SecurityManager: Changing modify acls to: root
22/03/01 18:58:48 INFO SecurityManager: Changing view acls groups to: 
22/03/01 18:58:48 INFO SecurityManager: Changing modify acls groups to: 
22/03/01 18:58:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
22/03/01 18:58:48 INFO Utils: Successfully started service 'sparkDriver' on port 43005.
22/03/01 18:58:48 INFO SparkEnv: Registering MapOutputTracker
22/03/01 18:58:48 INFO SparkEnv: Registering BlockManagerMaster
22/03/01 18:58:48 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/03/01 18:58:48 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/03/01 18:58:48 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
22/03/01 18:58:48 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-08fa4812-277a-4fcd-9a3a-8fa20910960d
22/03/01 18:58:48 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
22/03/01 18:58:48 INFO SparkEnv: Registering OutputCommitCoordinator
22/03/01 18:58:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.
22/03/01 18:58:48 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://dcc55fed507c:4040
22/03/01 18:58:48 INFO SparkContext: Added JAR /jars/snowflake-jdbc-3.13.15.jar at spark://dcc55fed507c:43005/jars/snowflake-jdbc-3.13.15.jar with timestamp 1646161128297
22/03/01 18:58:48 INFO SparkContext: Added JAR /jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar at spark://dcc55fed507c:43005/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar with timestamp 1646161128297
22/03/01 18:58:48 INFO Executor: Starting executor ID driver on host dcc55fed507c
22/03/01 18:58:48 INFO Executor: Fetching spark://dcc55fed507c:43005/jars/snowflake-jdbc-3.13.15.jar with timestamp 1646161128297
22/03/01 18:58:48 INFO TransportClientFactory: Successfully created connection to dcc55fed507c/172.17.0.2:43005 after 19 ms (0 ms spent in bootstraps)
22/03/01 18:58:48 INFO Utils: Fetching spark://dcc55fed507c:43005/jars/snowflake-jdbc-3.13.15.jar to /tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/userFiles-41e82fa6-114a-4edf-99dc-c460ca487ae9/fetchFileTemp6096133446952027318.tmp
22/03/01 18:58:49 INFO Executor: Adding file:/tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/userFiles-41e82fa6-114a-4edf-99dc-c460ca487ae9/snowflake-jdbc-3.13.15.jar to class loader
22/03/01 18:58:49 INFO Executor: Fetching spark://dcc55fed507c:43005/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar with timestamp 1646161128297
22/03/01 18:58:49 INFO Utils: Fetching spark://dcc55fed507c:43005/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar to /tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/userFiles-41e82fa6-114a-4edf-99dc-c460ca487ae9/fetchFileTemp5234593641216600025.tmp
22/03/01 18:58:49 INFO Executor: Adding file:/tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/userFiles-41e82fa6-114a-4edf-99dc-c460ca487ae9/spark-snowflake_2.12-2.10.0-spark_3.1.jar to class loader
22/03/01 18:58:49 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37765.
22/03/01 18:58:49 INFO NettyBlockTransferService: Server created on dcc55fed507c:37765
22/03/01 18:58:49 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/03/01 18:58:49 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, dcc55fed507c, 37765, None)
22/03/01 18:58:49 INFO BlockManagerMasterEndpoint: Registering block manager dcc55fed507c:37765 with 366.3 MiB RAM, BlockManagerId(driver, dcc55fed507c, 37765, None)
22/03/01 18:58:49 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, dcc55fed507c, 37765, None)
22/03/01 18:58:49 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, dcc55fed507c, 37765, None)
22/03/01 18:58:49 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/spark-warehouse').
Traceback (most recent call last):
  File "/src/reports.py", line 4, in <module>
    class Reports:
  File "/src/reports.py", line 37, in Reports
    .option("query", "select * from Users limit 10")
  File "/opt/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load
  File "/opt/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/opt/spark-3.1.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/opt/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.
: java.lang.ClassNotFoundException: Failed to find data source: snowflake. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:689)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:743)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:266)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: snowflake.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:663)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:663)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:663)
        ... 14 more

22/03/01 18:58:50 INFO SparkContext: Invoking stop() from shutdown hook
22/03/01 18:58:50 INFO SparkUI: Stopped Spark web UI at http://dcc55fed507c:4040
22/03/01 18:58:50 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/03/01 18:58:50 INFO MemoryStore: MemoryStore cleared
22/03/01 18:58:50 INFO BlockManager: BlockManager stopped
22/03/01 18:58:50 INFO BlockManagerMaster: BlockManagerMaster stopped
22/03/01 18:58:50 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/03/01 18:58:50 INFO SparkContext: Successfully stopped SparkContext
22/03/01 18:58:50 INFO ShutdownHookManager: Shutdown hook called
22/03/01 18:58:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2
22/03/01 18:58:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-ca009a52-5c9a-40f7-b88f-e02280104cb2/pyspark-8b92b10f-c141-46bc-bdc4-55a98d34e594
22/03/01 18:58:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-170d1583-baf4-4b61-ba62-c016c7948006

Dockerfile

ARG SPARK_VERSION=3.1.1
ARG HADOOP_VERSION=3.2

# Official Amazon Linux Image
FROM library/amazonlinux:latest AS builder

ARG SPARK_VERSION
ARG HADOOP_VERSION

# Change the Default Shell for 'builder'
SHELL ["/bin/bash", "-c"]

# Install Package Dependencies for Installations
RUN yum update -y && yum upgrade -y
RUN yum install -y diffutils gzip tar wget

# Install Spark
WORKDIR /opt
RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
RUN wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz.sha512
RUN gpg --print-md sha512 spark-*-bin-hadoop*.tgz | diff - spark-*-bin-hadoop*.tgz.sha512
RUN tar fxz spark-*-bin-hadoop*.tgz
RUN rm      spark-*-bin-hadoop*.tgz*

# Copy Build Artifact
FROM library/amazonlinux:latest
COPY --from=builder /opt /opt

ARG SPARK_VERSION
ARG HADOOP_VERSION

COPY yum-requirements.txt .
RUN yum -y install $(cat yum-requirements.txt)
RUN pip3 install --upgrade pip
COPY python-requirements.txt .
RUN sed -i 's/{SPARK_VERSION}/${SPARK_VERSION}/g' python-requirements.txt
RUN pip3 install -r python-requirements.txt
COPY snowflake-requirements.txt .
RUN pip3 install -r snowflake-requirements.txt

# Configure Environment Variables
ENV SPARK_HOME=/opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} \
    JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk \
    PATH=/opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/bin:/usr/lib/jvm/jre-1.8.0-openjdk/bin:${PATH} \
    PYSPARK_PYTHON=/usr/bin/python3

WORKDIR /home

脚本

from pyspark.sql import SparkSession  
import configparser

class Reports:
    spark = SparkSession.builder \
        .config('spark.jars', '/jars/snowflake-jdbc-3.13.15.jar,/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar') \
        .getOrCreate()

    config = configparser.ConfigParser()
    config.read("/src/config.ini")

    # Snowflake connection parameters
    sfparams = {
        "sfURL": f'{config["snowflake"]["account"]}.{config["snowflake"]["region"]}.snowflakecomputing.com',
        "sfAccount": config["snowflake"]["account"],
        "sfUser": config["snowflake"]["user"],
        "sfPassword": config["snowflake"]["password"],
        "sfDatabase": config["snowflake"]["database"],
        "sfSchema": config["snowflake"]["schema"],
        "sfWarehouse": config["snowflake"]["warehouse"],
    }

    SNOWFLAKE_SOURCE_NAME = "snowflake"
    df = (
        spark.read.format(SNOWFLAKE_SOURCE_NAME)
        .options(**sfparams)
        .option("query", "select * from Users limit 10")
        .load()
    )
    df.show()

if __name__ == "__main__":
    Reports()
4

2 回答 2

0

这个问题是如何执行作业。

docker run --interactive --tty \
                --volume /src:/src \
                --volume /data/:/root/data \
                --volume /jars:/jars \
                reports bash '-c' "cp -r /jars /opt/spark-3.1.1-bin-hadoop3.2/jars && cd /home && export PYTHONIOENCODING=utf8 && spark-submit \
                /src/reports.py \
                --jars net.snowflake:/jars/snowflake-jdbc-3.13.14.jar,net.snowflake:/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar \
                --partitions-output "4" \
                1> >(sed $'s,.*,\e[32m&\e[m,' >&2)" || true

本来应该

docker run --interactive --tty \
                --volume /src:/src \
                --volume /data/:/root/data \
                --volume /jars:/jars \
                reports bash '-c' "cp -r /jars /opt/spark-3.1.1-bin-hadoop3.2/jars && cd /home && export PYTHONIOENCODING=utf8 && spark-submit \
                --jars /jars/snowflake-jdbc-3.13.14.jar,/jars/spark-snowflake_2.12-2.10.0-spark_3.1.jar \
                /src/reports.py \
                --partitions-output "4" \
                1> >(sed $'s,.*,\e[32m&\e[m,' >&2)" || true

在 spark-submit 脚本之前需要包含的 jars

于 2022-03-03T12:49:31.880 回答
0

而不是--jars,尝试--packages=net.snowflake:snowflake-jdbc:3.13.14,net.snowflake:spark-snowflake_2.11:2.9.3-spark_2.4

于 2022-03-01T20:58:34.763 回答