1

我正在尝试使用 Spark 2.3.0 实现流连接玩具

当条件匹配时,流连接工作正常,但即使使用 leftOuterJoin,条件不匹配时也会丢失左流值。

提前致谢

这是我的源代码和数据,基本上,我正在创建两个套接字,一个是 9999 作为右流源,一个是 9998 作为左流源。

val spark = SparkSession
      .builder
      .appName("StreamStream")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("ERROR")

    val s9999: DataFrame = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    val s9999Dataset: Dataset[S9999] = s9999
      .map(line => {
        val strings = line.get(0).toString.split(",")
        val id = strings(0).toInt
        val time = Timestamp.valueOf(strings(1))
        S9999(id, time)
      })
      .withWatermark("timestamp99", "30 seconds")

    val s9998Dataset: Dataset[S9998] = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9998)
      .load()
      .map(line => {
        val strings = line.get(0).toString.split(",")
        val id = strings(0).toInt
        val time = Timestamp.valueOf(strings(1))
        S9998(id, time)
      })

    val resultDataset = s9998Dataset
      .join(s9999Dataset,
        joinExprs = expr(
          """
                id99 = id98 AND
                timestamp98 >= timestamp99 AND
                timestamp98 <= timestamp99 + interval 6 seconds
        """),
        joinType = "leftOuter")

    val streamingQuery = resultDataset
      .writeStream
      .outputMode("append")
      .format("console")
      .start()

    streamingQuery.awaitTermination()
  }

  case class S9999(id99: Int, timestamp99: Timestamp)

  case class S9998(id98: Int, timestamp98: Timestamp)

样本数据:

左插座:

1,2011-10-02 18:50:20.123
2,2011-10-02 18:50:25.123
3,2011-10-02 18:50:30.123
4,2011-10-02 18:50:35.123
5,2011-10-02 18:50:40.123
6,2011-10-02 18:50:45.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
10,2011-10-02 18:51:05.123
11,2011-10-02 18:51:10.123
12,2011-10-02 18:51:15.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123

正确的流数据:

1,2011-10-02 18:50:20.123
3,2011-10-02 18:50:30.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123
4

1 回答 1

0

在这个问题上花了6个小时后,我发现左侧可选水印实际上是强制性的

于 2021-02-28T07:35:17.183 回答