3

我创建了一个 dataproc 集群,并试图提交我的本地作业进行测试。

gcloud beta dataproc clusters create test-cluster \
--region us-central1 \
--zone us-central1-c \
--master-machine-type n1-standard-4 \
--master-boot-disk-size 500 \
--num-workers 2 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 500 \
--image-version preview-ubuntu18 \
--project my-project-id \
--service-account my-service-account@project-id.iam.gserviceaccount.com \
--scopes https://www.googleapis.com/auth/cloud-platform \
--tags dataproc,iap-remote-admin \
--subnet my-vpc \
--properties spark:spark.jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar

尝试提交一个非常简单的脚本

import argparse
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, DataFrame


def load_data(spark: SparkSession):
    customers = spark.read.format('bigquery')\
        .option('table', 'MY_DATASET.MY_TABLE')\
        .load()
    customers.printSchema()
    customers.show()


if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('my-test-app') \
        .getOrCreate()
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

    load_data(spark)

尝试使用提交作业,都得到了几乎相同的错误:

# 1
gcloud dataproc jobs submit pyspark myscript.py --cluster=test-cluster --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar

# 2
gcloud dataproc jobs submit pyspark myscript.py --cluster=test-cluster --region=us-central1

# 3
gcloud dataproc jobs submit pyspark myscript.py --cluster=test-cluster --region=us-central1 --properties spark:spark.jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar

这是错误消息:

Job [d8adf906970f43d2b348eb89728b2b7f] submitted.
Waiting for job output...
20/11/12 00:33:44 INFO org.spark_project.jetty.util.log: Logging initialized @3339ms
20/11/12 00:33:44 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
20/11/12 00:33:44 INFO org.spark_project.jetty.server.Server: Started @3431ms
20/11/12 00:33:44 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@3f6c2ca9{HTTP/1.1,[http/1.1]}{0.0.0.0:35517}
20/11/12 00:33:45 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at test-cluster-m/10.154.64.65:8032
20/11/12 00:33:45 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at test-cluster-m/10.154.64.65:10200
20/11/12 00:33:45 INFO org.apache.hadoop.conf.Configuration: resource-types.xml not found
20/11/12 00:33:45 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/11/12 00:33:45 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/11/12 00:33:45 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/11/12 00:33:47 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1605139742119_0002
Traceback (most recent call last):
  File "/tmp/d8adf906970f43d2b348eb89728b2b7f/vessel_master.py", line 44, in <module>
    load_data(spark)
  File "/tmp/d8adf906970f43d2b348eb89728b2b7f/vessel_master.py", line 14, in load_data
    .option('table', 'MY_DATASET.MY_TABLE')\
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o61.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiated
        at java.util.ServiceLoader.fail(ServiceLoader.java:232)
        at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
        at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
        at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
        at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
        at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:648)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:213)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
        at com.google.cloud.spark.bigquery.BigQueryUtilScala$.<init>(BigQueryUtil.scala:34)
        at com.google.cloud.spark.bigquery.BigQueryUtilScala$.<clinit>(BigQueryUtil.scala)
        at com.google.cloud.spark.bigquery.BigQueryRelationProvider.<init>(BigQueryRelationProvider.scala:41)
        at com.google.cloud.spark.bigquery.BigQueryRelationProvider.<init>(BigQueryRelationProvider.scala:48)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
        ... 29 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging$class
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 39 more

听说可能是兼容性问题,我尝试降级集群并使用镜像版本 1.5-debian10,但得到了同样的错误。

任何帮助,将不胜感激

4

1 回答 1

6

Dataproc 预览图像包含带有 Scala 2.12 的 Spark 3。您提到的连接器 jar 基于 Scala 2.11。请将 URL 更改为gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar.

于 2020-11-13T20:52:15.813 回答