1

我有一个要求,我需要在不丢失任何消息的情况下处理来自 Kafka 的消息,并且还需要维护消息顺序。因此,我在我的 Kafka 流拓扑中使用了事务并启用了“exactly_once”处理保证。因为我假设拓扑处理将是“全有或全无”,即消息偏移仅在最后一个节点成功处理消息后提交。

但是在失败的情况下,例如当数据库关闭并且处理器无法存储消息并引发异常时。此时,拓扑按预期消失,并在重新平衡时自动重新创建。我假设拓扑应该重新使用来自 Kafka 主题的原始消息,或者在应用程序重新启动时,它应该重新使用来自 Kafka 主题的原始消息。但是,似乎原始消息消失了,并且在该拓扑死亡后从未被消费或处理。

我需要做什么来重新处理发送到 Kafka 主题的原始消息?或者 Kafka 配置需要更改什么?我是否需要手动分配状态存储并跟踪在变更日志主题上处理的消息?

拓扑:

@Singleton
public class EventTopology extends Topology {

    private final Deserializer<String> deserializer = Serdes.String().deserializer();
    private final Serializer<String> serializer = Serdes.String().serializer();

    private final EventLogMessageSerializer eventLogMessageSerializer;
    private final EventLogMessageDeserializer eventLogMessageDeserializer;
    private final EventLogProcessorSupplier eventLogProcessorSupplier;

    @Inject
    public EventTopology(EventsConfig eventsConfig,
                         EventLogMessageSerializer eventLogMessageSerializer,
                         EventLogMessageDeserializer eventLogMessageDeserializer,
                         EventLogProcessorSupplier eventLogProcessorSupplier) {

        this.eventLogMessageSerializer = eventLogMessageSerializer;
        this.eventLogMessageDeserializer = eventLogMessageDeserializer;
        this.eventLogProcessorSupplier = eventLogProcessorSupplier;
        init(eventsConfig);
    }

    private void init(EventsConfig eventsConfig) {

        var topics = eventsConfig.getTopicConfig().getTopics();
        String eventLog = topics.get("eventLog");

        addSource("EventsLogSource", deserializer, eventLogMessageDeserializer, eventLog)
             .addProcessor("EventLogProcessor", eventLogProcessorSupplier, "EventsLogSource");
    }
}

处理器:

@Singleton
@Slf4j
public class EventLogProcessor implements Processor<String, EventLogMessage> {

    private final EventLogService eventLogService;
    private ProcessorContext context;

    @Inject
    public EventLogProcessor(EventLogService eventLogService) {
        this.eventLogService = eventLogService;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, EventLogMessage value) {
        log.info("Processing EventLogMessage={}", value);

        try {
            eventLogService.storeInDatabase(value);
            context.commit();

        } catch (Exception e) {
            log.warn("Failed to process EventLogMessage={}", value, e);
            throw e;
        }
    }
    
    @Override
    public void close() {
    }
}

配置:

eventsConfig:
  saveTopicsEnabled: false
  topologyConfig:
    environment: "LOCAL"
    broker: "localhost:9093"
    enabled: true
    initialiseWaitInterval: 3 seconds
    applicationId: "eventsTopology"
    config:
      auto.offset.reset: latest
      session.timeout.ms: 6000
      fetch.max.wait.ms: 7000
      heartbeat.interval.ms: 5000
      connections.max.idle.ms: 7000
      security.protocol: SSL
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
      value.serializer: org.apache.kafka.common.serialization.StringSerializer
      max.poll.records: 5
      processing.guarantee: exactly_once
      metric.reporters: com.simple.metrics.kafka.DropwizardReporter
      default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
      enable.idempotence: true
      request.timeout.ms: 8000
      acks: all
      batch.size: 16384
      linger.ms: 1
      enable.auto.commit: false
      state.dir: "/tmp"
  topicConfig:
      topics:
        eventLog: "EVENT-LOG-LOCAL"
      kafkaTopicConfig:
        partitions: 18
        replicationFactor: 1
        config:
          retention.ms: 604800000

测试:

Feature: Feature covering the scenarios to process event log messages produced by external client.

  Background:
    Given event topology is healthy

  Scenario: event log messages produced are successfully stored in the database
    Given database is down
    And the following event log messages are published
      | deptId | userId     | eventType | endDate              | eventPayload_partner |
      | dept-1 | user-1234  | CREATE    | 2021-04-15T00:00:00Z | PARTNER-1            |
    When database is up
    And database is healthy
    Then event log stored in the database as follows
      | dept_id | user_id   | event_type | end_date             | event_payload           |
      | dept-1  | user-1234 | CREATE     | 2021-04-15T00:00:00Z | {"partner":"PARTNER-1"} |

日志:

INFO  [data-plane-kafka-request-handler-1] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group eventsTopology in state PreparingRebalance with old generation 0 (__consumer_offsets-0) (reason: Adding new member eventsTopology-57fdac0e-09fb-4aa0-8b0b-7e01809b31fa-StreamThread-1-consumer-96a3e980-4286-461e-8536-5f04ccb2c778 with group instance id None) 
INFO  [executor-Rebalance] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Stabilized group eventsTopology generation 1 (__consumer_offsets-0) 
INFO  [data-plane-kafka-request-handler-2] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Assignment received from leader for group eventsTopology for generation 1 
INFO  [data-plane-kafka-request-handler-1] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-0_0 with producerId 0 and producer epoch 0 on partition __transaction_state-4 
INFO  [data-plane-kafka-request-handler-6] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-0_1 with producerId 1 and producer epoch 0 on partition __transaction_state-3 
...
INFO  [data-plane-kafka-request-handler-0] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-0_16 with producerId 17 and producer epoch 0 on partition __transaction_state-37 
INFO  [data-plane-kafka-request-handler-4] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_1 with producerId 18 and producer epoch 0 on partition __transaction_state-42 
INFO  [data-plane-kafka-request-handler-6] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_0 with producerId 19 and producer epoch 0 on partition __transaction_state-43 
...
INFO  [data-plane-kafka-request-handler-3] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_17 with producerId 34 and producer epoch 0 on partition __transaction_state-45 
INFO  [data-plane-kafka-request-handler-5] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_16 with producerId 35 and producer epoch 0 on partition __transaction_state-46 
INFO  [pool-26-thread-1] ManagerClient - Manager request {uri:http://localhost:8081/healthcheck, method:GET, body:'', headers:{}} 
INFO  [pool-26-thread-1] ManagerClient - Manager response from with body {"Database":{"healthy":true},"eventsTopology":{"healthy":true}} 
INFO  [dw-admin-130] KafkaConnectionCheck - successfully connected to kafka broker: localhost:9093 
INFO  [kafka-producer-network-thread | EVENT-LOG-LOCAL-test-client-id] LocalTestEnvironment - Message: ProducerRecord(topic=EVENT-LOG-LOCAL, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={"endDate":1618444800000,"deptId":"dept-1","userId":"user-1234","eventType":"CREATE","eventPayload":{"previousEndDate":null,"partner":"PARTNER-1","info":null}}, timestamp=null) pushed onto topic: EVENT-LOG-LOCAL 
INFO  [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] EventLogProcessor - Processing EventLogMessage=EventLogMessage(endDate=Thu Apr 15 01:00:00 BST 2021, deptId=dept-1, userId=user-1234, eventType=CREATE, eventPayload=EventLogMessage.EventPayload(previousEndDate=null, partner=PARTNER-1, info=null)) 
WARN  [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] EventLogProcessor - Failed to process EventLogMessage=EventLogMessage(endDate=Thu Apr 15 01:00:00 BST 2021, deptId=dept-1, userId=user-1234, eventType=CREATE, eventPayload=EventLogMessage.EventPayload(previousEndDate=null, partner=PARTNER-1, info=null)) 
    exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at manager.service.EventLogService.storeInDatabase(EventLogService.java:24)
    at manager.topology.processor.EventLogProcessor.process(EventLogProcessor.java:47)
    at manager.topology.processor.EventLogProcessor.process(EventLogProcessor.java:19)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
ERROR [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] Failed to process stream task 0_8 due to the following error: 
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_8, processor=EventsLogSource, topic=EVENT-LOG-LOCAL, partition=8, offset=0, stacktrace=exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
ERROR [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down:  
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_8, processor=EventsLogSource, topic=EVENT-LOG-LOCAL, partition=8, offset=0, stacktrace=exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
ERROR [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1] org.apache.kafka.streams.KafkaStreams - stream-client [eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3] All stream threads have died. The instance will be in error state and should be closed. 
Exception: java.lang.IllegalStateException thrown from the UncaughtExceptionHandler in thread "eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1"
INFO  [executor-Heartbeat] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Member eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1-consumer-f11ca299-2a68-4317-a559-dd1b96cd431f in group eventsTopology has failed, removing it from the group 
INFO  [executor-Heartbeat] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group eventsTopology in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: removing member eventsTopology-b21df600-cd39-4c9d-9e7a-f55f53ac9fd3-StreamThread-1-consumer-f11ca299-2a68-4317-a559-dd1b96cd431f on heartbeat expiration) 
INFO  [data-plane-kafka-request-handler-2] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Stabilized group eventsTopology generation 2 (__consumer_offsets-0) 
INFO  [data-plane-kafka-request-handler-6] kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Assignment received from leader for group eventsTopology for generation 2 
INFO  [data-plane-kafka-request-handler-0] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-0_0 with producerId 0 and producer epoch 1 on partition __transaction_state-4 
...
INFO  [data-plane-kafka-request-handler-0] kafka.coordinator.transaction.TransactionCoordinator - [TransactionCoordinator id=0] Initialized transactionalId eventsTopology-1_16 with producerId 35 and producer epoch 1 on partition __transaction_state-46 
INFO  [main] Cluster - New databse host localhost/127.0.0.1:59423 added 

com.jayway.awaitility.core.ConditionTimeoutException: Condition defined as a lambda expression in steps.EventLogSteps
Expecting:
 <0>
to be equal to:
 <1>
but was not. within 20 seconds. 
4

0 回答 0