5

给定以下代码:

KStream<String, Custom> stream =  
    builder.stream(Serdes.String(), customSerde, "test_in");

stream
    .groupByKey(Serdes.String(), customSerde)
    .reduce(new CustomReducer(), "reduction_state")
    .print(Serdes.String(), customSerde);

println在 Reducer 的 apply 方法中有一个语句,当我预计会发生缩减时,它会成功打印出来。但是,上面显示的最终打印语句什么也不显示。同样,如果我使用to方法而不是print,我在目标主题中看不到任何消息。

在reduce语句之后我需要什么才能看到减少的结果?如果将一个值推送到输入,我不希望看到任何东西。如果按下具有相同键的第二个值,我希望减速器应用(它确实如此),并且我还希望减少的结果继续到处理管道的下一步。如前所述,我在管道的后续步骤中没有看到任何内容,我不明白为什么。

4

1 回答 1

10

从 Kafka 开始,0.10.1.0所有聚合算子都使用内部重复数据删除缓存来减少结果 KTable 更改日志流的负载。例如,如果您直接计算并处理具有相同键的两条记录,则完整的变更日志流将为<key:1>, <key:2>.

使用新的缓存功能,缓存将接收<key:1>并存储它,但不会立即将其发送到下游。计算时<key:2>,它会替换缓存的第一个条目。根据缓存大小、不同键的数量、吞吐量和您的提交间隔,缓存会向下游发送条目。这发生在缓存逐出单个键条目或完全刷新缓存(将所有条目发送到下游)时。因此,KTable 更改日志可能只显示<key:2>(因为<key:1>已删除重复)。

您可以通过 Streams 配置参数控制缓存的大小StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG。如果将该值设置为零,则完全禁用缓存,并且 KTable 更改日志将包含所有更新(有效地提供预0.10.0.0行为)。

Confluent 文档包含一个更详细地解释缓存的部分:

于 2016-11-11T21:11:29.190 回答