2

我的 Spark 2.4.x (pyspark) 应用需要:

  1. 输入是两个 Kafka 主题,输出是一个 Kafka 主题
  2. 一个“流表”,其中
    • 有一个逻辑键和
    • 剩余的列应该是来自任一流的最新值。
  3. 亚秒级延迟。测试表明,watermarks在不使用时这是可以实现的。

这似乎是一个基本的事情,但它并不完全适合我。


例子:

注意:在下面的示例中,T1、T2 和 T2 时间点可能相隔秒/分钟/小时。

T1)在时间T1

KafkaPriceTopic获得 1 个消息负载(我们称之为P1):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"1", "PriceCol": "101.5"}

带有有效负载的KafkaVolumeTopic 1 消息(我们称之为V1):
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"1", "VolumeCol": "50"}

我想要一个看起来像这样 的结果:DataFrame

+-----------+--------+---------+-------------+--------------+ 
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|  
+-----------+--------+---------+-------------+--------------+ 
|Sec1       |101.5   |50       |1            |1             |
+-----------+--------+---------+-------------+--------------+ 

T2) KafkaPriceTopic 1 消息(P2):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"2", "PriceCol": "101.6"}

结果DataFrame

+-----------+--------+---------+-------------+--------------+ 
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|  
+-----------+--------+---------+-------------+--------------+ 
|Sec1       |101.6   |50       |2            |1             |
+-----------+--------+---------+-------------+--------------+ 

注意P1不再相关

T3) KafkaVolumeTopic 1 消息V2
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"2", "VolumeCol": "60"}

结果DataFrame

+-----------+--------+---------+-------------+--------------+ 
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|
+-----------+--------+---------+-------------+--------------+
|Sec1       |101.6   |60       |2            |2             |
+-----------+--------+---------+-------------+--------------+ 

注意P1V1不再相关


什么有效

  1. get_json_object从有效负载(现在)中 提取 json ,join这两个主题的流。
  2. 然而。这将产生(w/o watermark)a ,它包含Sec1DataFrame收到的所有价格和交易量,而不仅仅是最新的。
  3. 所以这后面是一个groupBy(...).agg(last(...),...). 但是我坚持只获得最新值的一行。
    dfKafka1 = spark.readStream.format("kafka"). #remaining options etc
                .load()
                .select(...)                     #pulls out fields as columns" 

    dfKafka2 = spark.readStream.format("kafka"). #remaining options etc
               .load()
               .select(...)                     #pulls out fields as columns" 

    dfResult=dfKafka1.join(dfKafka2,"SecurityCol")

#structured streaming doesnt yet allow groupBy after a join, so write to intermediate kafka topic

   dfResult.writestream.format("kafka"). #remaining options
           .trigger(processingTime="1 second")
           .start()

#load intermediate kafka topic
   dfKafkaResult=spark.readStream.format("kafka"). #remaining options
                 .load()
                 .select(...)                      #get_json_object for cols
                 .groupBy("SecurityCol")           #define the "key" to agg cols
                 .agg(last("PriceCol"),            #most recent value per col
                      last("PriceSeqNoCol"),
                      last("VolumeCol"),
                      last("VolumeSeqNoCol"))


问题

然而,最后的agg&last()并没有始终如一地做到这一点。

  1. 当 KafkaVolumeTopic 收到一条新消息时,结果可能会与来自 KafkaPriceTopic 的旧消息连接。
  2. 进一步orderBy的 /sort 不能在没有聚合的流上使用。

限制

  1. 我不能在那groupBy之前join因为那需要withWatermark,而且我认为我的应用程序不能使用watermark。理由:
    • 该应用程序应该能够在一天中的任何时间加入给定 SecurityCol 的两个主题。
      • 如果 PriceTopic 在上午 9 点收到消息,而 VolumeTopic 在上午 10 点收到消息
      • 我希望两人能加入并出席
    • 水印限制何时以append模式发出数据。所以不能在这里使用水印,因为时间范围是一整天。

有任何想法吗?

4

0 回答 0