1

我有一个 JSON 对象流,我键入几个值的散列。我希望在 n 秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析。

我的拓扑:K->aggregateByKey(n seconds)->process()

process - init()我调用的步骤ProcessorContent.schedule(60 * 1000L)中,希望.punctuate()得到调用。从这里我将遍历内部散列中的值并采取相应的行动。

我看到值来自聚合步骤并命中process()函数,但从.punctuate()未被调用。


代码:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);

KStream<String, String> mapped = opxLines.map(new ReMapper());

KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
            new AggregateInit(),
            new OpxAggregate(),
            TimeWindows.of("opx_aggregate", 60000));

ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
                            @Override
                            public Processor<Windowed<String>, String> get() {
                                 return new AggProcessor();
                            }
                       });
    
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

AggregateInit()返回空值。

我想我可以.punctuate()用一个简单的计时器来做同样的事情,但我想知道为什么这段代码不能像我希望的那样工作。

4

1 回答 1

0

我认为这与 kafka 集群的设置不正确有关。将文件描述符计数更改为比默认值(1024 -> 65535)高得多的值后,这似乎符合规范。

于 2016-09-20T18:22:31.760 回答