这spring.cloud.stream.bindings..consumer.concurrency
是每个消费者的内部选项:
adapter.setConcurrency(properties.getConcurrency());
...
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
因此,这对您的分布式解决方案没有任何作用。
instanceIndex
和instanceCount
在活页夹中的工作方式如下:
if (properties.getInstanceCount() > 1) {
shardOffsets = new HashSet<>();
KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
List<Shard> shards = kinesisConsumerDestination.getShards();
for (int i = 0; i < shards.size(); i++) {
// divide shards across instances
if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
KinesisShardOffset shardOffset = new KinesisShardOffset(
kinesisShardOffset);
shardOffset.setStream(destination.getName());
shardOffset.setShard(shards.get(i).getShardId());
shardOffsets.add(shardOffset);
}
}
}
因此,每个消费者都会在流中获得一个分片子集。因此,如果您有更多的分片,那么您最终可能会遇到一些分片没有被消耗的事实。
没有什么可以同时消费来自同一个分片的消息:每个集群只有一个线程可以消费一个分片。