1

在下面的代码片段中,我尝试将温度 DStream(从 Kafka 接收)转换为 pandas Dataframe。

def main_process(time, dStream):
print("========= %s =========" % str(time))

try:
    # Get the singleton instance of SparkSession
    spark = getSparkSessionInstance(dStream.context.getConf())

    # Convert RDD[String] to RDD[Row] to DataFrame
    rowRdd = dStream.map(lambda t: Row(Temperatures=t))

    df = spark.createDataFrame(rowRdd)

    df.show()

    print("The mean is: %m" % df.mean())

照原样,从未计算过平均值,我想这是因为“df”不是熊猫数据框(?)。

我尝试df = spark.createDataFrame(df.toPandas())根据相关文档使用,但编译器无法识别“toPandas()”,并且从未发生转换。

我是否走在正确的道路上,如果是,我应该如何应用转型?

或者也许我的方法是错误的,我必须以不同的方式处理 DStream?

先感谢您!

4

0 回答 0