4

我在 Kubernetes 中运行 4 个代理 Kafka 集群。复制因子为 3,ISR 为 2。

此外,还有一个生产者服务(运行 Spring 流)生成消息和一个从主题读取的消费者服务。现在我尝试使用滚动更新来更新 Kafka 集群,希望不会停机,但是在更新期间,生产者的日志中充满了这个错误:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

根据我的计算,当 1 个代理关闭时应该没有问题,因为最小 ISR 为 2。但是,生产者服务似乎不知道滚动更新并继续向同一个代理发送消息......

任何想法如何解决它?

这是我的 kafka.yaml

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
  namespace: default
  labels:
    app: kafka
spec:
  serviceName: kafka
  replicas: 4
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: kafka
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9308"
    spec:
      nodeSelector:
        middleware.node: "true"
      imagePullSecrets:
      - name: nexus-registry
      terminationGracePeriodSeconds: 300
      containers:
      - name: kafka
        image: kafka:2.12-2.1.0
        imagePullPolicy: IfNotPresent

        resources:
          limits:
            cpu: 3000m
            memory: 1800Mi
          requests:
            cpu: 2000m
            memory: 1800Mi
        env:

        # Replication
        - name: KAFKA_DEFAULT_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_MIN_INSYNC_REPLICAS
          value: "2"

        # Protocol Version
        - name: KAFKA_INTER_BROKER_PROTOCOL_VERSION
          value: "2.1"
        - name: KAFKA_LOG_MESSAGE_FORMAT_VERSION
          value: "2.1"

        - name: ENABLE_AUTO_EXTEND
          value: "true"
        - name: KAFKA_DELETE_TOPIC_ENABLE
          value: "true"
        - name: KAFKA_RESERVED_BROKER_MAX_ID
          value: "999999999"
        - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
          value: "true"
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR
          value: "10"
        - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_LOG_RETENTION_BYTES
          value: "1800000000000"
        - name: KAFKA_ADVERTISED_HOST_NAME
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: KAFKA_OFFSETS_RETENTION_MINUTES
          value: "10080"
        - name: KAFKA_ZOOKEEPER_CONNECT
          valueFrom:
            configMapKeyRef:
              name: zk-config
              key: zk.endpoints
        - name: KAFKA_LOG_DIRS
          value: /kafka/kafka-logs
        ports:
        - name: kafka
          containerPort: 9092
        - name: prometheus
          containerPort: 7071
        volumeMounts:
        - name: data
          mountPath: /kafka
        readinessProbe:
          tcpSocket:
            port: 9092
          timeoutSeconds: 1
          failureThreshold: 12
          initialDelaySeconds: 10
          periodSeconds: 30
          successThreshold: 1
      - name: kafka-exporter
        image: danielqsj/kafka-exporter:latest
        resources:
          requests:
            cpu: 100m
            memory: 100Mi
          limits:
            cpu: 500m
            memory: 500Mi
        ports:
        - containerPort: 9308
  volumeClaimTemplates:
  - metadata:
      name: data
      labels:
        app: kafka
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 2000Gi
4

0 回答 0