1

我在将数据从 kafka 主题打印到控制台时遇到问题。我收到的错误消息如下图所示。 任务错误中的异常

正如您在上图中看到的那样,在第 0 批之后,它不会进一步处理。

错误2

错误3

所有这些都是错误消息的快照。我不明白发生错误的根本原因。请帮我。

以下是kafka和spark版本:

spark version: spark-3.1.1-bin-hadoop2.7
kafka version: kafka_2.13-2.7.0

我正在使用以下罐子:

kafka-clients-2.7.0.jar 
spark-sql-kafka-0-10_2.12-3.1.1.jar 
spark-token-provider-kafka-0-10_2.12-3.1.1.jar 

这是我的代码:

spark = SparkSession \
        .builder \
        .appName("Pyspark structured streaming with kafka and cassandra") \
        .master("local[*]") \
        .config("spark.jars","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.executor.extraClassPath","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.executor.extraLibrary","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .config("spark.driver.extraClassPath","file:///C://Users//shivani//Desktop//Spark//kafka-clients-2.7.0.jar,file:///C://Users//shivani//Desktop//Spark//spark-sql-kafka-0-10_2.12-3.1.1.jar,file:///C://Users//shivani//Desktop//Spark//spark-cassandra-connector-2.4.0-s_2.11.jar,file:///D://mysql-connector-java-5.1.46//mysql-connector-java-5.1.46.jar,file:///C://Users//shivani//Desktop//Spark//spark-token-provider-kafka-0-10_2.12-3.1.1.jar")\
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")


#streaming dataframe that reads from kafka topic
    df_kafka=spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers",kafka_bootstrap_servers)\
    .option("subscribe",kafka_topic_name)\
    .option("startingOffsets", "latest") \
    .load()

    print("Printing schema of df_kafka:")
    df_kafka.printSchema()

    #converting data from kafka broker to string type
    df_kafka_string=df_kafka.selectExpr("CAST(value AS STRING) as value")

    # schema to read json format data
    ts_schema = StructType() \
        .add("id_str", StringType()) \
        .add("created_at", StringType()) \
        .add("text", StringType())

    #parse json data
    df_kafka_string_parsed=df_kafka_string.select(from_json(col("value"),ts_schema).alias("twts"))

    df_kafka_string_parsed_format=df_kafka_string_parsed.select("twts.*")
    df_kafka_string_parsed_format.printSchema()


    df=df_kafka_string_parsed_format.writeStream \
    .trigger(processingTime="1 seconds") \
    .outputMode("update")\
    .option("truncate","false")\
    .format("console")\
    .start()

    df.awaitTermination()
4

1 回答 1

3

错误 ( NoClassDefFound,后跟kafka010包) 表示spark-sql-kafka-0-10缺少对 的传递依赖org.apache.commons:commons-pool2:2.6.2,如您在此处看到的

您也可以下载该 JAR,或者您可以更改代码以使用--packages而不是spark.jars选项,并让 Ivy 处理下载传递依赖项

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache...'

spark = SparkSession.bulider...
于 2021-07-02T14:33:59.937 回答