3

在我的应用程序中,我从 Kafka 队列中获取了一个帐户流(使用带有 kafka 的 Spark 流)

而且我需要从 S3 获取与这些帐户相关的属性,因此我计划缓存 S3 结果数据帧,因为 S3 数据现在至少一天不会更新,它可能会在未来很快更改为 1 小时或 10 分钟。所以问题是如何在不停止进程的情况下定期刷新缓存的数据帧。

**更新:我计划在 S3 中有更新时将事件发布到 kafka,使用 SNS 和 AWS lambda,我的流应用程序将订阅该事件并根据此事件刷新缓存的数据帧(基本上是 unpersist()cache 和从 S3 重新加载)这是一个好方法吗?

4

2 回答 2

5

最近在Spark 邮件列表上提出了这个问题

据我所知,完成您所要求的唯一方法是在新数据到达时从 S3 重新加载 DataFrame,这意味着您还必须重新创建流 DF 并重新启动查询。这是因为 DataFrame 基本上是不可变的。

如果您想在不重新加载 DataFrame 的情况下更新(变异)数据,您需要尝试与 Spark 集成或连接到 Spark并允许变异的数据存储之一。我知道的一个是SnappyData

于 2017-07-24T20:07:50.630 回答
0

实现的最简单方法,下面的代码读取每个批次的维度数据文件夹,但请记住新的维度数据值(在我的情况下为国家名称)必须是一个新文件。

package com.databroccoli.streaming.dimensionupateinstreaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}

object RefreshDimensionInStreaming {

  def main(args: Array[String]) = {

    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))

    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))

    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")

    var countryDf: Option[DataFrame] = None: Option[DataFrame]

    def updateDimensionDf() = {
      val dimDf2 = spark.read
        .schema(schemaUntyped2)
        .option("header", "true")
        .csv("src/main/resources/broadcasttest/dimension")

      if (countryDf != None) {
        countryDf.get.unpersist()
      }

      countryDf = Some(
        dimDf2
          .withColumnRenamed("id", "id_2")
          .withColumnRenamed("countrycode", "countrycode_2"))

      countryDf.get.show()
    }

    factDf1.writeStream
      .outputMode("append")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.show(10)

        updateDimensionDf()

        batchDF
          .join(
            countryDf.get,
            expr(
              """
      countrycode_2 = countrycode 
      """
            ),
            "leftOuter"
          )
          .show

      }
      .start()
      .awaitTermination()

  }

}
于 2021-03-19T16:53:01.490 回答