1

我正在从 Kafka 接收 DStream,我想通过按键将所有消息分组在某个滑动窗口中。

关键是这个窗口需要基于每条消息中提供的时间戳(单独的字段):

Message structure
--------------------------
key1, ..., ..., 1557678233
key1, ..., ..., 1557678234 
key2, ..., ..., 1557678235 

所以,我想考虑每个键的消息timestamp of the first message- timestamp of the last message<= 5 分钟

正如我从这个问题中看到的那样,这是不可行的,因为 Spark 只计算事件的系统时间。那边的人建议使用updateStateByKey,这对我来说不是很清楚......

也许我们可以使用另一种方法来实现这一点?

如果在函数中包含时间戳的差异,combinerscombineByKey通过持续时间阈值进一步求和和过滤呢?

如果您有机会遇到同样的问题,请添加您的想法,或者分享您的解决方案......

谢谢!

4

2 回答 2

1

可能吗?毫无疑问。Apache Beam等提供Apache Spark 后端可以轻松处理此类操作。

但是,这绝对不是您想要自己实现的东西,除非您拥有大量的开发资源和大量可用的专有技术。如果你有,你可能一开始就不会问这个问题。

处理延迟事件、乱序事件以及从节点故障中恢复可能是最棘手的,因为边缘情况的数量更多。

此外,在您实际实现它之前它就会过时 -DStream已经被认为是一个遗留 API,并且很可能很快就会达到它的生命周期结束。同时结构化流已经可以处理开箱即用的事件时间窗口。

于 2019-06-22T00:15:38.147 回答
0

使用以下示例数据进行测试,我假设时间戳采用纪元格式 -

[key1, ..., ..., 1557678233]
[key1, ..., ..., 1557678234]
[key2, ..., ..., 1557678235]
[key2, ..., ..., 1557678240]
[key2, ..., ..., 1557678271]
[key3, ..., ..., 1557678635]
[key3, ..., ..., 1557678636]
[key3, ..., ..., 1557678637]
[key3, ..., ..., 1557678638]
[key3, ..., ..., 1557678999]

//-- 如果记录需要处理或拒绝,则创建 udf 以返回

scala> spark.udf.register("recordStatusUDF", (ts:String) => {
     |     val ts_array = ts.split(",",-1)
     |     if ((ts_array.max.trim.toLong - ts_array.min.trim.toLong) <= 300) {
     |        "process"
     |     }
     |     else { "reject" }
     | })
res83: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

//-- 创建模式

scala> val schema = StructType(Seq(StructField("key", StringType, true),StructField("col2", StringType, true),StructField("col3", StringType, true),StructField("epoch_ts", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(key,StringType,true), StructField(col2,StringType,true), StructField(col3,StringType,true), StructField(epoch_ts,StringType,true))

//-- 创建数据框

scala> spark.createDataFrame(rdd,schema).createOrReplaceTempView("kafka_messages")


scala> spark.sql(s""" select x.key, recordStatusUDF(x.ts) as action_ind from ( select key, concat_ws(",", collect_list(epoch_ts)) as ts from kafka_messages group by key)x """).createOrReplaceTempView("action")

scala> val result = spark.sql(s""" select km.* from kafka_messages km inner join action ac on km.key = ac.key and ac.action_ind = "process" """)
result: org.apache.spark.sql.DataFrame = [key: string, col2: string ... 2 more fields]

scala> result.show(false)
+----+----+----+-----------+
|key |col2|col3|epoch_ts   |
+----+----+----+-----------+
|key1| ...| ...| 1557678233|
|key1| ...| ...| 1557678234|
|key2| ...| ...| 1557678235|
|key2| ...| ...| 1557678240|
|key2| ...| ...| 1557678271|
+----+----+----+-----------+

您可以在每个 rdd(kafka 消息)上使用上述代码。希望这会有所帮助。

于 2019-06-26T20:11:38.923 回答