0

spring-integration-aws我有一个使用version开发的 kinesis 消费者应用程序1.1.0.RELEASE

在我的测试中,我在同一个消费者组中运行此应用程序的两个实例,并从具有两个分片的流中消费。在我的测试中,我意识到KinesisMessageDrivenChannelAdapter会以三种方式分发消息:

  1. 所有消息传递给一个消费者
  2. 消息分发给两个消费者(不均匀)
  3. 两个消费者都收到了相同的消息

从生产者方面来看,消息在两个分片之间均匀分布。我想知道 kinesis 适配器如何在消费者之间分发消息,如果支持,我如何在消费者之间获得均匀分布。

谢谢

更新(适配器配置)

@Bean
  public KinesisMessageDrivenChannelAdapter kinesisInboundChannelAdapter(
      AmazonKinesis amazonKinesis) {
    String[] streamNames = this.consumerClientProperties.getKinesis().getStreamNames();
    KinesisMessageDrivenChannelAdapter adapter =
        new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
    adapter.setConverter(null);
    adapter.setOutputChannel(new QueueChannel());
    adapter.setCheckpointStore(dynamoDbMetaDataStore());
    adapter.setCheckpointMode(CheckpointMode.record);
    adapter.setStartTimeout(10000);
    adapter.setConsumerGroup(consumerClientProperties.getName());
    adapter.setListenerMode(ListenerMode.record);
    adapter.setDescribeStreamRetries(1);
    return adapter;
  }

  @Bean
  public DynamoDbMetadataStore dynamoDbMetaDataStore() {
    DynamoDbMetadataStore dynamoDbMetaDataStore = new DynamoDbMetadataStore(amazonDynamoDB(),
        consumerClientProperties.getName());
    return dynamoDbMetaDataStore;
  }
4

1 回答 1

0

建议大家升级到最新的Spring Integration AWS 2.0https ://spring.io/blog/2018/08/21/spring-integration-for-aws-2-0-ga-and-spring-cloud-流运动粘合剂 1-0-ga

在 Kinesis 消费者级别上进行了大量修复,现在我们有一个领导者选举,不会多次订阅同一个分片。

这个想法是在我们处理记录时有严格的顺序,因此每个集群只有一个线程应该可以访问一个分片。该线程可能会处理多个分片。

无论如何,如果您使用应用程序的两个实例,您需要注入一个MetadataStore基于共享数据的实例,例如DynamoDbMetadataStore.

于 2018-08-31T14:00:32.207 回答