0

我正在使用 spring kinesis binder 以批处理模式消费消息

有时我无法使用流中的消息。这并非一直发生

[ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49594358705006691330463332232285735104253344290967650306', 时间戳=null, 流='mystream-1', shard='00000}']=false}的记录在序列号 [null] 上。暂停消费 [1000] 毫秒。

我不断收到此消息,并且消息没有被消耗。

我的 Conf 如下所示

 spring:
  cloud:
    stream:
      bindings:
        input:
          group: mygroup
          destination: mystream-1
          content-type: application/json
        output:
          destination: mystream-2
          content-type: application/json
      kinesis:        
        bindings:
          input:
            consumer:
              listenerMode: batch
              idleBetweenPolls: 60000
              consumer-backoff: 1000
        binder:
          headers: x-item_id,x-message_type
          locks:
            table: lTLocks
            leaseDuration: 30
            refreshPeriod: 3000
          checkpoint:
            table: lTCheckPoints

流中有消息,但我不会间歇性地消费。你能帮忙吗?

4

1 回答 1

0

我认为您在 Kinesis 流中的检查点存储和分片包含不同的序列号。

lTCheckPoints请考虑在开始从流中消费之前使用清理表。

我并不是说你需要一直这样做,但至少为了干净的测试环境。

于 2019-04-16T14:26:53.910 回答