我一直在玩 spring-cloud-stream (1.0.0.BUILD-SNAPSHOT with kafka binder)并注意到消费者离线时,发送的任何消息都会丢失。当我启动消费者时,它不会处理发送到 kafka 的请求的积压。这是故意的吗?
2 回答
We definitely need to improve our documentation but here are some pointers in the mean time.
If you want the consumer to process messages produced while they were stopped, you need to specify a consumer group name, e.g. spring.cloud.stream.bindings.<bindingName>.group=foo
. When a consumer group is specified, the application with start at either a) the latest unconsumed message if a client with the same consumer group has run already (i.e. we recorded consumed offsets for that consumer) or b) the value specified by spring.cloud.stream.binder.kafka.start-offset
(which can be earliest
or latest
, representing the start or the end of the topic). So restarting consumers that preserve the consumer group will consume from where they left, and new consumers will start according to the start option. If a group is not specified, then the consumer will be considered 'anonymous', and only be interested in messages produced after it has started, therefore it will always start at the end of the partition set.
If you wanted to circumvent the already saved value, then you can use the spring.cloud.stream.binder.kafka.reset-offets=true
, which will cause the client to reset the saved offsets and start at the value indicated by spring.cloud.stream.binder.kafka.start-offset
.
This reflects the behaviour expected by (and supported with) 0.8.2. We will update things accordingly once we upgrate to 0.9.
是的; 默认是从主题的结尾开始收听。
利用
spring:
cloud:
stream:
binder:
kafka:
start-offset: earliest