0

在安装 Pyspark 并对其进行测试后,它工作正常并为 kafka 集成添加了正确的连接器,现在当我尝试从同一网络中的另一台机器从 kafka 加载日期并开始工作时,它卡在 [*] 中,不错误,没有什么,我不明白这里的问题,所以请如果有人可以帮助我,这是我的代码:

import os

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'

import findspark

findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

kafka_topic_name = "test-spark"
kafka_bootstrap_servers = '192.168.1.3:9092'

spark = SparkSession \
    .builder \
    .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
    .master("local[*]") \
    .getOrCreate()

# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic_name) \
    .load()

print("Printing Schema of df: ")
df.printSchema()


df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
df1.printSchema()

 schema = StructType() \
        .add("name", StringType()) \
        .add("type", StringType())

df2 = df1\
        .select(from_json(col("value"), schema)\
        .alias("records"), "timestamp")
    df3 = df2.select("records.*", "timestamp")

  print("Printing Schema of records_df3: ")
    df3.printSchema()

 records_write_stream = df3 \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start()
    records_write_stream.awaitTermination()

    print("Stream Data Processing Application Completed.")

在此处输入图像描述

- the command for kafka console producer that i tried with
$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test_spark \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

>{"f1": "value1"}
>{"f1": "value2"}
>{"f1": "value3"}
  • 当我尝试领导 kafka 时,它没有显示任何错误并继续,但是当我尝试开始工作时,它卡住了
  • PS:奇怪的是我试图在托管kafka的机器关闭时运行这段代码,它加载了kafka,即:这段代码没有错误:
# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic_name) \
    .load()
  • 并继续,直到它再次卡在最后一段代码中,如上图所示,这很奇怪
  • 所以请有什么建议吗?
4

0 回答 0