1

我正在使用一个spark-shell实例来测试从客户端的 kafka 源中提取数据。要启动我正在使用命令的实例spark-shell --jars spark-sql-kafka-0-10_2.11-2.5.0-palantir.8.jar, kafka_2.12-2.5.0.jar, kafka-clients-2.5.0.jar(所有 jar 都存在于工作目录中)。

但是,当我val df = spark.read.format("kafka")...........在几秒钟后运行命令时,它会崩溃并显示以下内容:

java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
  at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:344)
  at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
  at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
  at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
  at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
  at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
  at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:533)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:304)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
  ... 48 elided
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
  at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 79 more

但是 - 如果我将 spark-shell 命令中 jar 的顺序更改为 ,则会spark-shell --jars kafka_2.12-2.5.0.jar, kafka-clients-2.5.0.jar, spark-sql-kafka-0-10_2.11-2.5.0-palantir.8.jar崩溃:

java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:376)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateBatchOptions(KafkaSourceProvider.scala:330)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:113)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
  ... 48 elided
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
  at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 55 more

我正在开发一个由我们的客户管理的非常严格的代理并且无法使用--packages,我在这里有点茫然,我无法在 shell 启动时加载所有 3 个依赖项吗?我在某个地方错过了另一个步骤吗?

4

3 回答 3

2

Structured Streaming + Kafka Integration Guide中它说:

为了在 spark-shell 上进行试验,您需要在调用 spark-shell 时添加上面的库及其依赖项。

您正在使用的库似乎是定制的,并且在 Maven 中央存储库中不公开。这意味着,我无法查看它的依赖关系。

但是,查看最新的稳定版本2.4.5,根据maven 中央存储库的依赖项是kafka-clientsversion 2.0.0

于 2020-05-26T09:25:31.207 回答
0

您正在尝试导入多个 scala 版本2.112.12不同库的。

请添加相同版本的 scala 库并在下面查看如何导入spark-shell.

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.apache.kafka:kafka_2.11:2.4.1,org.apache.kafka:kafka-clients:2.4.1

于 2020-05-26T10:03:04.510 回答
0

One occasionally disruptive issue is dealing with dependency conflicts in cases where a user application and Spark itself both depend on the same library. This comes up relatively rarely, but when it does, it can be vexing for users. Typically, this will manifest itself when a NoSuchMethodError, a ClassNotFoundException, or some other JVM exception related to class loading is thrown during the execution of a Spark job. There are two solutions to this problem. The first is to modify your application to depend on the same version of the third-party library that Spark does. The second is to modify the packaging of your application using a procedure that is often called “shading.” The Maven build tool supports shading through advanced configuration of the plug-in shown in Example 7-5 (in fact, the shading capability is why the plugin is named maven-shade-plugin). Shading allows you to make a second copy of the conflicting package under a different namespace and rewrites your application’s code to use the renamed version. This somewhat brute-force technique is quite effective at resolving runtime dependency conflicts. For specific instructions on how to shade dependencies, see the documentation for your build tool.

I would try to know the scala version of the spark-shell because, it can be a scala version issue

scala> util.Properties.versionString
res3: String = version 2.11.8

if not, then check what spark version you are using and third-party library versions you are using as dependencies because, I am sure there is newest or oldest that your spark version doesn't support.

I hope it helps.

于 2020-05-26T09:39:13.903 回答