0

我正在批处理模式下使用消息。我想每 250 毫秒从流中提取 8 条消息。

spring:  
  cloud:
    stream:
      kinesis:
        bindings:
          input:
            consumer:
              listenerMode: batch
              idleBetweenPolls: 250
              recordsLimit: 8
      bindings:
        input:
          group: my-group
          destination: stream
          content-type: application/json

我已经将大约 100 条消息推送到流中,并且我启动了消费者。

根据配置,我应该每 250 毫秒提取一次消息。但是轮询器不是每 250 毫秒拉一次消息。

@StreamListener(Sink.INPUT)
  public void receiveMessage(Message<List<byte[]>> messages) {
    log.info("Total received messages: " + messages.getPayload().size());
  }

2019-04-27 12:04:40.145 : Total received messages: 8
2019-04-27 12:04:41.604 : Total received messages: 8
2019-04-27 12:04:43.167 : Total received messages: 8
2019-04-27 12:04:44.618 : Total received messages: 8
2019-04-27 12:04:46.145 : Total received messages: 8
2019-04-27 12:04:47.775 : Total received messages: 8
2019-04-27 12:04:49.211 : Total received messages: 8
2019-04-27 12:04:50.756 : Total received messages: 8
2019-04-27 12:04:52.283 : Total received messages: 8
2019-04-27 12:04:53.817 : Total received messages: 8

我什至根本不处理任何东西。它只是日志。

每条消息之间的时间超过 250 毫秒。我有什么遗漏吗。

4

1 回答 1

0

250 ms 对于实际用例没有意义。也许您可以将间隔增加到 5 秒并尝试一下。

于 2019-04-29T09:01:47.063 回答