3

我正在尝试使用命令行工具通过 SSL 使用 0.11.0.3 kafka 版本连接以删除 kafka 代理作为消费者,连接字符串如下

kafka-console-consumer.bat \
    --bootstrap-server host:port \
    --topic topicName \
    --from-beginning \
    --group groupId \
    --consumer.config ssl.properties

ssl.properties文件

security.protocol=SSL
ssl.truststore.location=path/to/truststore.jks
ssl.truststore.password=1234567
ssl.keystore.location=path/to/keystore.jks
ssl.keystore.password=1234567
ssl.key.password=1234567
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.type=JKS
ssl.keystore.type=JKS

我得到的例外

[2020-04-28 17:24:39,522] ERROR [Consumer clientId=consumer-1, groupId=binomix] Connection to node -1 (/178.208.149.84:9301) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2020-04-28 17:24:39,524] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
        at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1582)
        at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:544)
        at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1216)
        at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1184)
        at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:471)
        at org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:448)
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:313)
        at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
        at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
        at sun.security.ssl.Alerts.getSSLException(Alerts.java:198)
        at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
        at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:333)
        at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:325)
        at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1689)
        at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:226)
        at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1082)
        at sun.security.ssl.Handshaker$1.run(Handshaker.java:1015)
        at sun.security.ssl.Handshaker$1.run(Handshaker.java:1012)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1520)
        at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
        at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
        ... 18 more
Caused by: java.security.cert.CertificateException: No subject alternative names present
        at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:145)
        at sun.security.util.HostnameChecker.match(HostnameChecker.java:94)
        at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:459)
        at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:440)
        at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:284)
        at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
        at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1676)
        ... 27 more
Processed a total of 0 messages

任何想法为什么会发生?是证书的问题吗?我没有生成它们,它们是 kafka 服务器所有者给我的。提前致谢。

4

2 回答 2

4

您的代理证书中似乎缺少主题备用名称 (SAN)(例如/var/private/ssl/kafka.server.truststore.jks)。

为此,请将参数附加-ext SAN=DNS:{FQDN}到 keytool 命令:

keytool \ 
    -keystore kafka.server.keystore.jks \
    -alias localhost \
    -validity {validity} \
    -genkey \
    -keyalg RSA \
    -ext SAN=DNS:{FQDN}

确保在创建服务器的密钥库时包括 SAN。Confluent 的安全教程中也提到了这一点:

如果启用主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名 (FQDN):

  1. 通用名称 (CN)
  2. 主题备用名称 (SAN)

这两个字段都有效,但是RFC-2818建议使用 SAN。SAN 也更加灵活,允许声明多个 DNS 条目。另一个优点是可以将 CN 设置为更有意义的值以用于授权目的。


或者,您可以选择禁用服务器主机验证

ssl.endpoint.identification.algorithm通过设置为空字符串来禁用服务器主机名验证 。

因此,您只需要在server.properties以下配置中进行设置,最后重新启动您的 Kafka 集群

ssl.endpoint.identification.algorithm=
于 2020-04-28T14:52:14.600 回答
0

请注意,在生成密钥对(以及密钥库)时包含“SAN”是不够的。

keytool \ 
    -keystore kafka.server.keystore.jks \
    -alias {alias} \
    -validity {validity} \
    -genkey \
    -keyalg RSA \
    -ext SAN=DNS:{FQDN}

在创建证书签名请求时,还需要包含“SAN”。

keytool \ 
    -keystore kafka.server.keystore.jks \ 
    -alias {alias} \ 
    -certreq -ext SAN=DNS:{FQDN} 
    -file {csr_filename}

可以验证由此创建的证书签名请求,并且它应该具有相关的 SAN。

keytool \ 
    -v -printcertreq -file {csr_filename}

稍后,如果使用 openssl 的 x509 命令来完成证书签名请求,则应注意明确包含 x509 版本 3 扩展。“SAN”就是这样一种扩展,除非明确包含,否则不会进入最终颁发的证书。此处详细讨论了该概念 -证书中不存在主题备用名称

于 2020-12-24T22:16:35.943 回答