我有一个 Spring Cloud Stream Kafka Stream 应用程序,它从一个主题(事件)中读取并执行一个简单的处理:
@Configuration
class EventKStreamConfiguration {
private val logger = LoggerFactory.getLogger(javaClass)
@StreamListener
fun process(@Input("event") eventStream: KStream<String, EventReceived>) {
eventStream.foreach { key, value ->
logger.info("--------> Processing Event {}", value)
// Save in DB
}
}
}
此应用程序使用来自 Confluent Cloud 的 Kafka 环境,其事件主题具有 6 个分区。完整的配置是:
spring:
application:
name: events-processor
cloud:
stream:
schema-registry-client:
endpoint: ${schema-registry-url:http://localhost:8081}
kafka:
streams:
binder:
brokers: ${kafka-brokers:localhost}
configuration:
application:
id: ${spring.application.name}
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
processing:
guarantee: exactly_once
bindings:
event:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
event:
destination: event
data:
mongodb:
uri: ${mongodb-uri:mongodb://localhost/test}
server:
port: 8085
logging:
level:
org.springframework.kafka.config: debug
---
spring:
profiles: confluent-cloud
cloud:
stream:
kafka:
streams:
binder:
autoCreateTopics: false
configuration:
retry:
backoff:
ms: 500
security:
protocol: SASL_SSL
sasl:
mechanism: PLAIN
jaas:
config: xxx
basic:
auth:
credentials:
source: USER_INFO
schema:
registry:
basic:
auth:
user:
info: yyy
KStream 正在正确处理消息。如果我重新启动应用程序,它们不会被重新处理。注意:我不希望它们被重新处理,所以这种行为是可以的。
但是启动日志显示了一些奇怪的位:
- 首先,它显示了恢复消费者客户端的创建。自动偏移重置无:
2019-07-19 10:20:17.120 INFO 82473 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] Creating restore consumer client
2019-07-19 10:20:17.123 INFO 82473 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
- 然后它最早创建一个具有自动偏移重置的消费者客户端。
2019-07-19 10:20:17.235 INFO 82473 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] Creating consumer client
2019-07-19 10:20:17.241 INFO 82473 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
- 启动日志的最终跟踪显示偏移量重置为 0。每次重新启动应用程序时都会发生这种情况:
2019-07-19 10:20:31.577 INFO 82473 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2019-07-19 10:20:31.578 INFO 82473 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f] State transition from REBALANCING to RUNNING
2019-07-19 10:20:31.669 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-3 to offset 0.
2019-07-19 10:20:31.669 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-0 to offset 0.
2019-07-19 10:20:31.669 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-1 to offset 0.
2019-07-19 10:20:31.669 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-5 to offset 0.
2019-07-19 10:20:31.670 INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-4 to offset 0.
配置了两个消费者的原因是什么?
auto.offset.reset = earliest
当我没有明确配置它并且Kafka默认值是最新的时,为什么第二个有?我想要默认的(auto.offset.reset = latest)行为,它似乎工作正常。但是,这与我在日志中看到的内容不矛盾吗?
更新:
我会这样改写第三个问题:为什么日志显示分区在每次重新启动时都被重置为 0,尽管如此,没有消息被重新传递到 KStream?
更新 2:
我已经简化了这个场景,这次使用的是原生 Kafka Streams 应用程序。该行为与使用 Spring Cloud Stream 观察到的行为完全相同。但是,检查消费者组和我发现的分区是有道理的。
流媒体:
fun main() {
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = "streams-wordcount"
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
val builder = StreamsBuilder()
val source = builder.stream<String, String>("streams-plaintext-input")
source.foreach { key, value -> println("$key $value") }
val streams = KafkaStreams(builder.build(), props)
val latch = CountDownLatch(1)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(object : Thread("streams-wordcount-shutdown-hook") {
override fun run() {
streams.close()
latch.countDown()
}
})
try {
streams.start()
latch.await()
} catch (e: Throwable) {
exitProcess(1)
}
exitProcess(0)
}
这是我所看到的:
1)对于一个空主题,启动显示所有分区重置为偏移量0:
07:55:03.885 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-2 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-4 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0
2)我在主题中放入一条消息并检查消费者组,看到记录在分区 4 中:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
streams-plaintext-input 0 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 5 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 1 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 2 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 3 - 0 - streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 4 1 1 0 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1 streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
3)我重新启动应用程序。现在重置只影响空分区(0、1、2、3、5):
07:57:39.477 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-2 to offset 0.
07:57:39.478 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
07:57:39.478 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
07:57:39.479 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
07:57:39.479 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0.
4)我插入另一条消息,检查消费者组状态,同样的事情发生了:记录在分区 2 中,当重新启动应用程序时,它只重置空分区(0、1、3、5):
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
streams-plaintext-input 0 - 0 - streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 5 - 0 - streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 1 - 0 - streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 2 1 1 0 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 3 - 0 - streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 4 1 1 0 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1 streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
08:00:42.313 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0.