我的 Spark 2.4.x (pyspark) 应用需要:
- 输入是两个 Kafka 主题,输出是一个 Kafka 主题
- 一个“流表”,其中
- 有一个逻辑键和
- 剩余的列应该是来自任一流的最新值。
- 亚秒级延迟。测试表明,
watermarks
在不使用时这是可以实现的。
这似乎是一个基本的事情,但它并不完全适合我。
例子:
注意:在下面的示例中,T1、T2 和 T2 时间点可能相隔秒/分钟/小时。
T1)在时间T1
KafkaPriceTopic获得 1 个消息负载(我们称之为P1):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"1", "PriceCol": "101.5"}
带有有效负载的KafkaVolumeTopic 1 消息(我们称之为V1):
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"1", "VolumeCol": "50"}
我想要一个看起来像这样 的结果:DataFrame
+-----------+--------+---------+-------------+--------------+
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|
+-----------+--------+---------+-------------+--------------+
|Sec1 |101.5 |50 |1 |1 |
+-----------+--------+---------+-------------+--------------+
T2) KafkaPriceTopic 1 消息(P2):
{ "SecurityCol":"Sec1", "PriceSeqNoCol":"2", "PriceCol": "101.6"}
结果DataFrame
+-----------+--------+---------+-------------+--------------+
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|
+-----------+--------+---------+-------------+--------------+
|Sec1 |101.6 |50 |2 |1 |
+-----------+--------+---------+-------------+--------------+
注意:P1不再相关
T3) KafkaVolumeTopic 1 消息V2:
{ "SecurityCol":"Sec1", "VolumeSeqNoCol":"2", "VolumeCol": "60"}
结果DataFrame
+-----------+--------+---------+-------------+--------------+
|SecurityCol|PriceCol|VolumeCol|PriceSeqNoCol|VolumeSeqNoCol|
+-----------+--------+---------+-------------+--------------+
|Sec1 |101.6 |60 |2 |2 |
+-----------+--------+---------+-------------+--------------+
注意:P1和V1不再相关
什么有效
get_json_object
从有效负载(现在)中 提取 json ,join
这两个主题的流。- 然而。这将产生(w/o
watermark
)a ,它包含Sec1DataFrame
收到的所有价格和交易量,而不仅仅是最新的。 - 所以这后面是一个
groupBy(...).agg(last(...),...)
. 但是我坚持只获得最新值的一行。
dfKafka1 = spark.readStream.format("kafka"). #remaining options etc
.load()
.select(...) #pulls out fields as columns"
dfKafka2 = spark.readStream.format("kafka"). #remaining options etc
.load()
.select(...) #pulls out fields as columns"
dfResult=dfKafka1.join(dfKafka2,"SecurityCol")
#structured streaming doesnt yet allow groupBy after a join, so write to intermediate kafka topic
dfResult.writestream.format("kafka"). #remaining options
.trigger(processingTime="1 second")
.start()
#load intermediate kafka topic
dfKafkaResult=spark.readStream.format("kafka"). #remaining options
.load()
.select(...) #get_json_object for cols
.groupBy("SecurityCol") #define the "key" to agg cols
.agg(last("PriceCol"), #most recent value per col
last("PriceSeqNoCol"),
last("VolumeCol"),
last("VolumeSeqNoCol"))
问题
然而,最后的agg
&last()
并没有始终如一地做到这一点。
- 当 KafkaVolumeTopic 收到一条新消息时,结果可能会与来自 KafkaPriceTopic 的旧消息连接。
- 进一步
orderBy
的 /sort 不能在没有聚合的流上使用。
限制
- 我不能在那
groupBy
之前join
因为那需要withWatermark
,而且我认为我的应用程序不能使用watermark
。理由:- 该应用程序应该能够在一天中的任何时间加入给定 SecurityCol 的两个主题。
- 如果 PriceTopic 在上午 9 点收到消息,而 VolumeTopic 在上午 10 点收到消息
- 我希望两人能加入并出席
- 水印限制何时以
append
模式发出数据。所以不能在这里使用水印,因为时间范围是一整天。
- 该应用程序应该能够在一天中的任何时间加入给定 SecurityCol 的两个主题。
有任何想法吗?