2

我是流数据处理的新手,我觉得必须是一个非常基本的用例。

假设我有一个(User, Alert)元组流。我想要的是对每个用户的流进行速率限制。即我想要一个只为用户输出一次警报的流。在接下来的 60 分钟内,用户的任何传入警报都应该被吞下。在这 60 分钟之后,应再次触发传入警报。

我尝试了什么:

用作aggregate有状态转换,但聚合状态与时间相关。然而,即使结果KTable的聚合值没有变化,KTable(作为变更日志)将继续向下发送元素,因此无法达到“限速”流的预期效果

val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
  .groupBy((key, string) => string)
  .aggregate(() => "constant",
    (aggKey: String, value: String, aggregate: String) => aggregate,
    stringSerde,
    "name")
  .print

这提供了以下输出:

[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)

我通常不清楚如何/何时aggregate决定在下游发布元素。我最初的理解是它是立竿见影的,但似乎并非如此。据我所知,窗口在这里不应该有帮助。

Kafka Streams DSL 目前是否可能不考虑这种有状态转换的用例,类似于 Spark 的updateStateByKey或 Akka 的statefulMapConcat?我是否必须使用较低级别的处理器/变压器 API?

编辑:

可能的重复确实涉及记录缓存如何导致聚合何时决定向下游发布元素的问题。然而,主要问题是如何在 DSL 中实现“速率限制”。正如@miguno 指出的那样,必须恢复到较低级别的处理器 API。下面我粘贴了非常冗长的方法:

  val logConfig = new util.HashMap[String, String]();
  // override min.insync.replicas
  logConfig.put("min.insyc.replicas", "1")

  case class StateRecord(alert: Alert, time: Long)

  val countStore = Stores.create("Limiter")
    .withKeys(integerSerde)
    .withValues(new JsonSerde[StateRecord])
    .persistent()
    .enableLogging(logConfig)
    .build();
  builder.addStateStore(countStore)

  class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
    var context: ProcessorContext = null;
    var store: KeyValueStore[Integer, StateRecord] = null;

    override def init(context: ProcessorContext) = {
      this.context = context
      this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
    }

    override def transform(key: Integer, value: Alert) = {
      val current = System.currentTimeMillis()
      val newRecord = StateRecord(value._1, value._2, current)
      store.get(key) match {
        case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
          store.put(key, newRecord)
          (key, value)
        }
        case StateRecord(_, _) => null
        case null => {
          store.put(key, newRecord)
          (key, value)
        }
      }
    }
  }
4

1 回答 1

2

假设我有一个(User, Alert)元组流。我想要的是对每个用户的流进行速率限制。即我想要一个只为用户输出一次警报的流。在接下来的 60 分钟内,用户的任何传入警报都应该被吞下。在这 60 分钟之后,应再次触发传入警报。

目前在使用 Kafka Streams 的 DSL 时这是不可能的。相反,您可以(并且需要)使用较低级别的处理器 API 手动实现此类行为。

仅供参考:我们一直在 Kafka 社区讨论是否将此类功能(通常称为“触发器”)添加到 DSL。到目前为止,决定暂时不使用此类功能。

我通常不清楚如何/何时aggregate决定在下游发布元素。我最初的理解是它是立竿见影的,但似乎并非如此。

是的,这是 Kafka 0.10.0.0 的初始行为。从那时起(不确定您使用的是什么版本)我们引入了记录缓存;如果您禁用记录缓存,您将恢复初始行为,尽管据我了解,记录缓存会给您某种(间接)速率限制旋钮。因此,您可能希望保持启用缓存。

不幸的是,Apache Kafka 文档还没有涵盖记录缓存,与此同时,您可能想改为阅读http://docs.confluent.io/current/streams/developer-guide.html#memory-management

于 2017-02-02T11:10:54.637 回答