0

我尝试使用Spark 3.0.2阅读 kafka 主题,我使用以下库做了一个spark shell :

  • spark-sql-kafka-0-10_2.12-3.0.2.jar
  • kafka-avro-serializer-6.2.0.jar
  • kafka-clients-2.4.1.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.0.2.jar
  • spark-tags_2.12-3.0.2.jar
  • spark-token-provider-kafka-0-10_2.12-3.0.2.jar
  • commons-pool2-2.6.2.jar

我收到带有错误堆栈跟踪的以下输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----------+----------+-------+--------+--------+
|COL1|CUSTOMSREF|MPSIDCCKEY|MPSCOMP|MPSCREF1|MPSCREF2|
+-----+----------+----------+-------+--------+--------+
+-----+----------+----------+-------+--------+--------+

    21/07/21 10:14:30 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 20, 172.20.0.4, executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
            at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:122)
            at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:59)
            at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
            at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
            at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactory.java:60)
            at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
            at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:342)
            at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:265)
            at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.borrowObject(InternalKafkaConsumerPool.scala:84)
            at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.retrieveConsumer(KafkaDataConsumer.scala:554)
            at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:539)
            at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:285)
            at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
            at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:598)
            at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:281)
            at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:63)
            at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
            at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
            at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:438)
            at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
            at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
            at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:127)
            at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
            at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
            at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
            ... 38 more

谁能知道如何解决它?

这是 scala 中的火花代码:

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import java.nio.file.Files
import java.nio.file.Paths
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.streaming.Trigger

val schema = new String(Files.readAllBytes(Paths.get("/path/to/avro/schema.avsc")))
val topic = "topic_A"

val streamDf = spark
  .readStream.format("kafka")
  .option("kafka.bootstrap.servers", "kafka1:9192")
  .option("subscribe", topic)
  .option("kafka.group.id", "id_A")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.truststore.location", "/path/to/truststore/certificate.jks")
  .option("kafka.ssl.truststore.password", "password")
  .option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.plain.PlainLoginModule required username="user_A" password="password";""")
  .option("startingOffsets", "latest")
  .load()

val dataDf = streamDf.selectExpr("CAST(key as STRING)", "value")
  .select(from_avro(col("value"),schema) as "data")
  .select("data.*")

dataDf
    .writeStream
    .format("console")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime("3 seconds"))
    .start

非常感谢。

4

1 回答 1

1

我通过在 spark 集群的所有节点中添加 kafka 代理的 IP 地址解决了No resolvable bootstrap urls问题。之前,我只是在主节点中编辑了/etc/hosts文件。

感谢@koiralo 和@OneCricketeer 的建议。

于 2021-07-22T08:45:41.180 回答