0

我正在尝试使用 kafka-python 访问部署在 AWS 上的 k8s 集群(kops 不是 EKS)中的 Strimzi Kafka 集群。

这是监听器设置:

  • 我正在使用负载均衡器类型的外部侦听器
  • TLS 加密已禁用
  • 身份验证是 SCRAM-SHA-512
  • 我想安全地访问集群,我无法向代理提供证书以进行安全连接。因此,我使用覆盖在 LoadBalancers 上而不是在 Kafka 端设置 SSL。

这是配置:

spec:
  kafka:
    version: 2.5.0
    replicas: 3
    listeners:
      external:
        port: 9094
        tls: false
        type: loadbalancer
        authentication:
          type: scram-sha-512
        configuration:
          bootstrap:
            host: kafka-bootstrap.mydomain.com
          brokers:
          - broker: 0
            host: kafka-broker-0.mydomain.com
          - broker: 1
            host: kafka-broker-1.mydomain.com
          - broker: 2
            host: kafka-broker-2.mydomain.com
        overrides:
          bootstrap:
            address: kafka-bootstrap.mydomain.com
            dnsAnnotations:
              # external-dns.alpha.kubernetes.io/hostname: kafka-bootstrap.mydomain.com
              service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
              service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
              service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
          brokers:
          - advertisedHost: kafka-broker-0.mydomain.com
            advertisedPort: 9094
            broker: 0
            dnsAnnotations:
              # external-dns.alpha.kubernetes.io/hostname: kafka-broker-0.mydomain.com
              service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
              service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
              service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
          - advertisedHost: kafka-broker-1.mydomain.com
            advertisedPort: 9094
            broker: 1
            dnsAnnotations:
              # external-dns.alpha.kubernetes.io/hostname: kafka-broker-1.mydomain.com
              service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
              service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
              service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
          - advertisedHost: kafka-broker-2.mydomain.com
            advertisedPort: 9094
            broker: 2
            dnsAnnotations:
              external-dns.alpha.kubernetes.io/hostname: kafka-broker-2.mydomain.com
              service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
              service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
              service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"

我可以通过以下方式访问集群:

  • 没有身份验证(没有 SCRAM-SHA-512 或任何其他方法)
  • 其余与上述配置相同(唯一区别是不认证)

客户端配置(我使用的是 kafka-python)

consumer = KafkaConsumer(
            bootstrap_servers=['kafka-bootstrap.mydomain.com:9094'],
            client_id="test-consumer",
            security_protocol="SSL",
            api_version=(2, 5, 0),
        )

但我无法访问启用了 SCRAM-SHA-512 身份验证的集群。我在 python 客户端中收到以下错误:

DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <disconnected> [unspecified None]>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <disconnected> [IPv4 ('x.x.x.x', 9094)]>: setting socket option (6, 1, 1)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <connecting> [IPv4 ('x.x.x.x', 9094)]>: connecting to kafka-bootstrap.mydomain.com [('x.x.x.x', 9094) IPv4]
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<connecting> [IPv4 ('x.x.x.x', 9094)]>: established TCP connection
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<connecting> [IPv4 ('x.x.x.x', 9094)]>: initiating SSL handshake
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<handshake> [IPv4 ('x.x.x.x', 9094)]>: configuring default SSL Context
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<handshake> [IPv4 ('x.x.x.x', 9094)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: wrapping socket in ssl context
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: completed SSL handshake.
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: initiating SASL authentication
DEBUG:kafka.protocol.parser:Sending request SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]> Request 1: SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response SaslHandShakeResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<authenticating> [IPv4 ('x.x.x.x', 9094)]> Response 1 (220.21484375 ms): SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['SCRAM-SHA-512'])
ERROR:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Error receiving reply from server
Traceback (most recent call last):
  File "/home/something/.local/lib/python3.8/site-packages/kafka/conn.py", line 692, in _try_authenticate_scram
    (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
  File "/home/something/.local/lib/python3.8/site-packages/kafka/conn.py", line 616, in _recv_bytes_blocking
    raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Closing connection. KafkaConnectionError: <BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Connection reset during recv
DEBUG:kafka.client:Initializing connection to node bootstrap-0 for metadata request

并且随着客户端重试,此错误会重复。

这是我正在使用的生成此错误的 python 客户端配置。

consumer = KafkaConsumer(
            bootstrap_servers=['kafka-bootstrap.mydomain.com:9094'],
            client_id="test-consumer",
            security_protocol="SASL_SSL",
            api_version=(2, 5, 0),
            sasl_mechanism="SCRAM-SHA-512",
            sasl_plain_username='someusername',
            sasl_plain_password='somerandom'
        )

这是由于 Kafka 服务器/客户端配置中的一些错误造成的吗?我已经成功在 k8s 集群中创建了一个 KafkaUser 资源。

还是因为 Kafka 需要在 kafka 服务器端(而不是在 LoadBalancer 端)使用 SSL 才能使 SASL SCRAM-SHA-512 工作?如果是这样,是否有任何解决方法?

4

0 回答 0