0

下面是我第一个使用 kafka 和 pyspark 的程序。代码似乎毫无例外地运行,但我的查询输出为空。

我正在启动火花和卡夫卡。后来,在 Kafka 启动中,我订阅了 topic = "quickstart-events" 并从终端为该主题生成了消息。但是当我运行这段代码时,它给了我空白的数据框。

我该如何解决?

代码:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession, DataFrame
from pyspark.sql.types import StructType, ArrayType, StructField, IntegerType, StringType, DoubleType

spark = SparkSession.builder \
.appName("Spark-Kafka-Integration") \
.master("local[2]") \
.getOrCreate()

dsraw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "quickstart-events") \
.load()

ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))

rawQuery = dsraw \
        .writeStream \
        .queryName("query1")\
        .format("memory")\
        .start()

raw = spark.sql("select * from query1")
raw.show() # empty output

rawQuery = ds \
        .writeStream \
        .queryName("query2")\
        .format("memory")\
        .start()

raw = spark.sql("select * from query2")
raw.show()  # empty output
print("complete")

输出:

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

+---+-----+
|key|value|
+---+-----+
+---+-----+
4

1 回答 1

1

如果您正在学习和尝试使用 kafka spark 流,那很好。

只需使用:

    while (True):
    time.sleep(5)
    print("queryresult")
    raw.show()  # it will start printing the result

代替

            raw.show() # it will run only once that's why not printig the result.

不要用于生产代码。

最好写成:

spark = SparkSession.builder \
    .appName("Spark-Kafka-Integration") \
    .master("local[2]") \
    .getOrCreate()


dsraw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "quickstart-events") \
    .load()

ds = dsraw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

rawQuery = \
    ds \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

rawQuery.awaitTermination()

它会自动在控制台上打印结果。

于 2021-09-13T14:38:10.447 回答