我正在尝试使用 kafka-python 访问部署在 AWS 上的 k8s 集群(kops 不是 EKS)中的 Strimzi Kafka 集群。
这是监听器设置:
- 我正在使用负载均衡器类型的外部侦听器
- TLS 加密已禁用
- 身份验证是 SCRAM-SHA-512
- 我想安全地访问集群,我无法向代理提供证书以进行安全连接。因此,我使用覆盖在 LoadBalancers 上而不是在 Kafka 端设置 SSL。
这是配置:
spec:
kafka:
version: 2.5.0
replicas: 3
listeners:
external:
port: 9094
tls: false
type: loadbalancer
authentication:
type: scram-sha-512
configuration:
bootstrap:
host: kafka-bootstrap.mydomain.com
brokers:
- broker: 0
host: kafka-broker-0.mydomain.com
- broker: 1
host: kafka-broker-1.mydomain.com
- broker: 2
host: kafka-broker-2.mydomain.com
overrides:
bootstrap:
address: kafka-bootstrap.mydomain.com
dnsAnnotations:
# external-dns.alpha.kubernetes.io/hostname: kafka-bootstrap.mydomain.com
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
brokers:
- advertisedHost: kafka-broker-0.mydomain.com
advertisedPort: 9094
broker: 0
dnsAnnotations:
# external-dns.alpha.kubernetes.io/hostname: kafka-broker-0.mydomain.com
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
- advertisedHost: kafka-broker-1.mydomain.com
advertisedPort: 9094
broker: 1
dnsAnnotations:
# external-dns.alpha.kubernetes.io/hostname: kafka-broker-1.mydomain.com
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
- advertisedHost: kafka-broker-2.mydomain.com
advertisedPort: 9094
broker: 2
dnsAnnotations:
external-dns.alpha.kubernetes.io/hostname: kafka-broker-2.mydomain.com
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:....
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9094"
我可以通过以下方式访问集群:
- 没有身份验证(没有 SCRAM-SHA-512 或任何其他方法)
- 其余与上述配置相同(唯一区别是不认证)
客户端配置(我使用的是 kafka-python)
consumer = KafkaConsumer(
bootstrap_servers=['kafka-bootstrap.mydomain.com:9094'],
client_id="test-consumer",
security_protocol="SSL",
api_version=(2, 5, 0),
)
但我无法访问启用了 SCRAM-SHA-512 身份验证的集群。我在 python 客户端中收到以下错误:
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <disconnected> [unspecified None]>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <disconnected> [IPv4 ('x.x.x.x', 9094)]>: setting socket option (6, 1, 1)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <connecting> [IPv4 ('x.x.x.x', 9094)]>: connecting to kafka-bootstrap.mydomain.com [('x.x.x.x', 9094) IPv4]
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<connecting> [IPv4 ('x.x.x.x', 9094)]>: established TCP connection
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<connecting> [IPv4 ('x.x.x.x', 9094)]>: initiating SSL handshake
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<handshake> [IPv4 ('x.x.x.x', 9094)]>: configuring default SSL Context
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<handshake> [IPv4 ('x.x.x.x', 9094)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: wrapping socket in ssl context
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: completed SSL handshake.
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <handshake> [IPv4 ('x.x.x.x', 9094)]>: initiating SASL authentication
DEBUG:kafka.protocol.parser:Sending request SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]> Request 1: SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response SaslHandShakeResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com<authenticating> [IPv4 ('x.x.x.x', 9094)]> Response 1 (220.21484375 ms): SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['SCRAM-SHA-512'])
ERROR:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Error receiving reply from server
Traceback (most recent call last):
File "/home/something/.local/lib/python3.8/site-packages/kafka/conn.py", line 692, in _try_authenticate_scram
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
File "/home/something/.local/lib/python3.8/site-packages/kafka/conn.py", line 616, in _recv_bytes_blocking
raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Closing connection. KafkaConnectionError: <BrokerConnection node_id=bootstrap-0 host=kafka-bootstrap.mydomain.com <authenticating> [IPv4 ('x.x.x.x', 9094)]>: Connection reset during recv
DEBUG:kafka.client:Initializing connection to node bootstrap-0 for metadata request
并且随着客户端重试,此错误会重复。
这是我正在使用的生成此错误的 python 客户端配置。
consumer = KafkaConsumer(
bootstrap_servers=['kafka-bootstrap.mydomain.com:9094'],
client_id="test-consumer",
security_protocol="SASL_SSL",
api_version=(2, 5, 0),
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username='someusername',
sasl_plain_password='somerandom'
)
这是由于 Kafka 服务器/客户端配置中的一些错误造成的吗?我已经成功在 k8s 集群中创建了一个 KafkaUser 资源。
还是因为 Kafka 需要在 kafka 服务器端(而不是在 LoadBalancer 端)使用 SSL 才能使 SASL SCRAM-SHA-512 工作?如果是这样,是否有任何解决方法?