0

Hadoop 3.3.1 + Spark 3.1.2

  • OpenJDK 1.8.0
  • 斯卡拉 2.12.14

图书馆

  • spark-sql-kafka-0-10_2.12-3.1.2.jar
  • kafka-clients-2.6.0.jar
  • spark-token-provider-kafka-0-10_2.12-3.1.2.jar

打开 PySpark 外壳

pyspark --master yarn \
--deploy-mode client \
--driver-memory 16g \
--executor-memory 16g \
--executor-cores 5 \
--num-executors 30 \
--jars \
hdfs://ad-cluster/spark-yarn/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar,\
hdfs://ad-cluster/spark-yarn/jars/kafka-clients-2.6.0.jar,\
hdfs://ad-cluster/spark-yarn/jars/spark-token-provider-kafka-0-10_2.12-3.1.2.jar

PySpark 代码

>>> kdf = spark.read.parquet("/tmp/loc_kdf_2021080900/")
>>> kafka_topic="test.location.spark"
>>> kafka_servers="10.190.105.124:9092,10.190.105.125:9092,10.190.105.126:9092,10.190.105.149:9092,10.190.105.150:9092,10.190.105.151:9092,10.190.105.152:9092,10.190.105.153:9092,10.190.105.154:9092"
>>> kdf.write.format("kafka").option("kafka.bootstrap.servers",kafka_servers).option("topic",kafka_topic).save()

异常错误消息:

21/08/10 21:18:12 WARN TaskSetManager: Lost task 12.0 in stage 1.0 (TID 16) (ad-worker2 executor 14): org.apache.kafka.common.errors.TimeoutException: Topic test.location.spark not present in metadata after 60000 ms.
21/08/10 21:19:12 WARN TaskSetManager: Lost task 77.0 in stage 1.0 (TID 152) (ad-worker2 executor 14): org.apache.kafka.common.errors.TimeoutException: Topic test.location.spark not present in metadata after 60000 ms.
21/08/10 21:20:12 WARN TaskSetManager: Lost task 91.1 in stage 1.0 (TID 305) (ad-worker2 executor 14): org.apache.kafka.common.errors.TimeoutException: Topic test.location.spark not present in metadata after 60000 ms.
21/08/10 21:21:12 WARN TaskSetManager: Lost task 151.2 in stage 1.0 (TID 454) (ad-worker2 executor 14): org.apache.kafka.common.errors.TimeoutException: Topic test.location.spark not present in metadata after 60000 ms.
21/08/10 21:21:14 ERROR TaskSetManager: Task 17 in stage 1.0 failed 4 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/data/spark/python/pyspark/sql/readwriter.py", line 1107, in save self._jwrite.save()
  File "/data/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/data/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/data/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o54.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 1.0 failed 4 times, most recent failure: Lost task 17.3 in stage 1.0 (TID 458) (ad-worker1 executor 5): org.apache.kafka.common.errors.TimeoutException:
 Topic test.location.spark not present in metadata after 60000 ms.
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1020)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1018)
        at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:70)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:180)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test.location.spark not present in metadata after 60000 ms.

主题分区信息。

Topic: test.location.spark      PartitionCount: 54      ReplicationFactor: 1    Configs:
        Topic: test.location.spark      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: test.location.spark      Partition: 1    Leader: 6       Replicas: 6     Isr: 6
        Topic: test.location.spark      Partition: 2    Leader: 7       Replicas: 7     Isr: 7
        Topic: test.location.spark      Partition: 3    Leader: 3       Replicas: 3     Isr: 3
        Topic: test.location.spark      Partition: 4    Leader: 0       Replicas: 0     Isr: 0
        Topic: test.location.spark      Partition: 5    Leader: 5       Replicas: 5     Isr: 5
        Topic: test.location.spark      Partition: 6    Leader: 2       Replicas: 2     Isr: 2
        Topic: test.location.spark      Partition: 7    Leader: 8       Replicas: 8     Isr: 8
        Topic: test.location.spark      Partition: 8    Leader: 4       Replicas: 4     Isr: 4
        Topic: test.location.spark      Partition: 9    Leader: 1       Replicas: 1     Isr: 1
        Topic: test.location.spark      Partition: 10   Leader: 6       Replicas: 6     Isr: 6
        Topic: test.location.spark      Partition: 11   Leader: 7       Replicas: 7     Isr: 7
        Topic: test.location.spark      Partition: 12   Leader: 3       Replicas: 3     Isr: 3
        Topic: test.location.spark      Partition: 13   Leader: 0       Replicas: 0     Isr: 0
        Topic: test.location.spark      Partition: 14   Leader: 5       Replicas: 5     Isr: 5
        Topic: test.location.spark      Partition: 15   Leader: 2       Replicas: 2     Isr: 2
        Topic: test.location.spark      Partition: 16   Leader: 8       Replicas: 8     Isr: 8
        Topic: test.location.spark      Partition: 17   Leader: 4       Replicas: 4     Isr: 4
        Topic: test.location.spark      Partition: 18   Leader: 1       Replicas: 1     Isr: 1
4

0 回答 0