1

我试图为 Aws kinesis 流消费者实现负载平衡

根据我正在尝试实施的文档

spring:
  cloud:
    stream:
      instanceIndex: 1
      instanceCount: 3
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-kinesis-stream
          content-type: application/json

我有3 个容器,如果需要,我想在不重新启动现有容器的情况下启动新容器(最多 6 个) 。

  1. instanceIndex 从 0 或 1 开始。
  2. 如果我将 instanceCount 设为 6,但只启动了三个实例,那么在我启动新实例之前,是否会消耗所有消息。
  3. 在文档中,有一个名为 spring.cloud.stream.bindings..consumer.concurrency 的属性,你能帮助它的重要性。
  4. 由于某些原因,如果任何一个实例出现故障,是否有任何消息未被消费。

你能帮助我们吗

4

1 回答 1

1

spring.cloud.stream.bindings..consumer.concurrency是每个消费者的内部选项:

adapter.setConcurrency(properties.getConcurrency());

...

/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

因此,这对您的分布式解决方案没有任何作用。

instanceIndexinstanceCount在活页夹中的工作方式如下:

    if (properties.getInstanceCount() > 1) {
        shardOffsets = new HashSet<>();
        KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
        List<Shard> shards = kinesisConsumerDestination.getShards();
        for (int i = 0; i < shards.size(); i++) {
            // divide shards across instances
            if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
                KinesisShardOffset shardOffset = new KinesisShardOffset(
                        kinesisShardOffset);
                shardOffset.setStream(destination.getName());
                shardOffset.setShard(shards.get(i).getShardId());
                shardOffsets.add(shardOffset);
            }
        }
    }

因此,每个消费者都会在流中获得一个分片子集。因此,如果您有更多的分片,那么您最终可能会遇到一些分片没有被消耗的事实。

没有什么可以同时消费来自同一个分片的消息:每个集群只有一个线程可以消费一个分片。

于 2019-04-01T17:15:41.207 回答