1

与此问题类似但略有不同:KStream 批处理窗口,我想在将消息KStream推送给消费者之前对来自 a 的消息进行批处理。

但是,此下推不应安排在固定的时间窗口上,而应安排在每个键的固定消息计数阈值上。

首先想到两个问题:

1)这是AbstractProcessor应该处理的方式吗?类似于以下内容:

@Override
public void punctuate(long streamTime) {
    KeyValueIterator<String, Message[]> it = messageStore.all();
    while (it.hasNext()) 
        KeyValue<String, Message[]> entry = it.next();
        if (entry.value.length > 10) {
            this.context.forward(entry.key, entry.value);
            entry.value = new Message[10]();
        }
    }
}

2)由于StateStore可能会爆炸(如果条目值永远不会达到阈值以便被转发),“垃圾收集”的最佳方法是什么?我可以制定一个基于时间的计划并删除太旧的密钥……但这看起来非常 DIY 并且容易出错。

4

1 回答 1

2

我想这会奏效。应用基于时间的“垃圾收集”听起来也很合理。是的,使用处理器 API 代替 DSL 有点 DIY 的味道——这首先不是 PAPI 的目的(授权用户做任何需要的事情)。

不过有几点评论:

  • 您将需要一个更复杂的数据结构:因为punctuate()是根据流时间进度调用的,所以在两次调用之间,一个键的记录可能超过 10 条。因此,您需要KeyValueIterator<String, List<Message[]>> it = messageStore.all();能够为每个键存储多个批次的东西。
  • 我假设您需要微调 punctuate 的时间表,这会很棘手 - 如果您的时间表太紧,许多批次可能还没有完成并且您浪费 CPU - 如果您的时间表太松,您将需要大量内存,您的下游操作员将获得大量数据,因为您一次发出大量内容。向下游发送突发数据可能会成为问题。
  • 扫描整个商店的成本很高——尝试根据批量大小对键值对进行“排序”似乎是个好主意。这应该使您只能触摸已完成批次的键,而不是所有键。也许您可以保留一个已完成批次的键的内存列表,并且只查找那些(失败时,您需要对存储中的所有键进行一次传递以重新创建此内存列表)。
于 2016-12-02T17:36:34.463 回答