我尝试使用Spark 3.0.2阅读 kafka 主题,我使用以下库做了一个spark shell :
- spark-sql-kafka-0-10_2.12-3.0.2.jar
- kafka-avro-serializer-6.2.0.jar
- kafka-clients-2.4.1.jar
- spark-streaming-kafka-0-10-assembly_2.12-3.0.2.jar
- spark-tags_2.12-3.0.2.jar
- spark-token-provider-kafka-0-10_2.12-3.0.2.jar
- commons-pool2-2.6.2.jar
我收到带有错误堆栈跟踪的以下输出:
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----------+----------+-------+--------+--------+
|COL1|CUSTOMSREF|MPSIDCCKEY|MPSCOMP|MPSCREF1|MPSCREF2|
+-----+----------+----------+-------+--------+--------+
+-----+----------+----------+-------+--------+--------+
21/07/21 10:14:30 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 20, 172.20.0.4, executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:122)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:59)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactory.java:60)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:342)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:265)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.borrowObject(InternalKafkaConsumerPool.scala:84)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.retrieveConsumer(KafkaDataConsumer.scala:554)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:539)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:285)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:598)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:281)
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:63)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:438)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
... 38 more
谁能知道如何解决它?
这是 scala 中的火花代码:
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import java.nio.file.Files
import java.nio.file.Paths
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.streaming.Trigger
val schema = new String(Files.readAllBytes(Paths.get("/path/to/avro/schema.avsc")))
val topic = "topic_A"
val streamDf = spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9192")
.option("subscribe", topic)
.option("kafka.group.id", "id_A")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/path/to/truststore/certificate.jks")
.option("kafka.ssl.truststore.password", "password")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.plain.PlainLoginModule required username="user_A" password="password";""")
.option("startingOffsets", "latest")
.load()
val dataDf = streamDf.selectExpr("CAST(key as STRING)", "value")
.select(from_avro(col("value"),schema) as "data")
.select("data.*")
dataDf
.writeStream
.format("console")
.outputMode("append")
.trigger(Trigger.ProcessingTime("3 seconds"))
.start
非常感谢。