在安装 Pyspark 并对其进行测试后,它工作正常并为 kafka 集成添加了正确的连接器,现在当我尝试从同一网络中的另一台机器从 kafka 加载日期并开始工作时,它卡在 [*] 中,不错误,没有什么,我不明白这里的问题,所以请如果有人可以帮助我,这是我的代码:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
kafka_topic_name = "test-spark"
kafka_bootstrap_servers = '192.168.1.3:9092'
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
.master("local[*]") \
.getOrCreate()
# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.load()
print("Printing Schema of df: ")
df.printSchema()
df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
df1.printSchema()
schema = StructType() \
.add("name", StringType()) \
.add("type", StringType())
df2 = df1\
.select(from_json(col("value"), schema)\
.alias("records"), "timestamp")
df3 = df2.select("records.*", "timestamp")
print("Printing Schema of records_df3: ")
df3.printSchema()
records_write_stream = df3 \
.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.option("truncate", "false")\
.format("console") \
.start()
records_write_stream.awaitTermination()
print("Stream Data Processing Application Completed.")
- the command for kafka console producer that i tried with
$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test_spark \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
>{"f1": "value1"}
>{"f1": "value2"}
>{"f1": "value3"}
- 当我尝试领导 kafka 时,它没有显示任何错误并继续,但是当我尝试开始工作时,它卡住了
- PS:奇怪的是我试图在托管kafka的机器关闭时运行这段代码,它加载了kafka,即:这段代码没有错误:
# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.load()
- 并继续,直到它再次卡在最后一段代码中,如上图所示,这很奇怪
- 所以请有什么建议吗?