我是流数据处理的新手,我觉得必须是一个非常基本的用例。
假设我有一个(User, Alert)
元组流。我想要的是对每个用户的流进行速率限制。即我想要一个只为用户输出一次警报的流。在接下来的 60 分钟内,用户的任何传入警报都应该被吞下。在这 60 分钟之后,应再次触发传入警报。
我尝试了什么:
用作aggregate
有状态转换,但聚合状态与时间相关。然而,即使结果KTable
的聚合值没有变化,KTable(作为变更日志)将继续向下发送元素,因此无法达到“限速”流的预期效果
val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
.groupBy((key, string) => string)
.aggregate(() => "constant",
(aggKey: String, value: String, aggregate: String) => aggregate,
stringSerde,
"name")
.print
这提供了以下输出:
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
我通常不清楚如何/何时aggregate
决定在下游发布元素。我最初的理解是它是立竿见影的,但似乎并非如此。据我所知,窗口在这里不应该有帮助。
Kafka Streams DSL 目前是否可能不考虑这种有状态转换的用例,类似于 Spark 的updateStateByKey或 Akka 的statefulMapConcat?我是否必须使用较低级别的处理器/变压器 API?
编辑:
可能的重复确实涉及记录缓存如何导致聚合何时决定向下游发布元素的问题。然而,主要问题是如何在 DSL 中实现“速率限制”。正如@miguno 指出的那样,必须恢复到较低级别的处理器 API。下面我粘贴了非常冗长的方法:
val logConfig = new util.HashMap[String, String]();
// override min.insync.replicas
logConfig.put("min.insyc.replicas", "1")
case class StateRecord(alert: Alert, time: Long)
val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.enableLogging(logConfig)
.build();
builder.addStateStore(countStore)
class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
var context: ProcessorContext = null;
var store: KeyValueStore[Integer, StateRecord] = null;
override def init(context: ProcessorContext) = {
this.context = context
this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
}
override def transform(key: Integer, value: Alert) = {
val current = System.currentTimeMillis()
val newRecord = StateRecord(value._1, value._2, current)
store.get(key) match {
case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
store.put(key, newRecord)
(key, value)
}
case StateRecord(_, _) => null
case null => {
store.put(key, newRecord)
(key, value)
}
}
}
}