1

社区!

我正在尝试将我的 Mulesoft 应用程序连接到 Heroku 托管的 Kafka 服务器。Anypoint Studio 7 (Mule 4) 有一个 Kafka 连接器,它有几个连接选项(Basic、Kerberos、Kerberos SSL 和 SSL):

Kafka 连接器的设置

根据 Heroku 的文档,它“支持” SSL,但没有提到这是必需的。任何人都可以确认吗?

我看到的 Heroku 文档 [ https://devcenter.heroku.com/articles/kafka-on-heroku]

当我在 Heroku 上设置应用程序时,添加 Kafka AddOn,创建主题,我通过运行命令获取引导服务器heroku config:get KAFKA_URL 如果我尝试进行基本测试,则会收到以下错误:

在此处输入图像描述

错误状态org.mule.runtime.api.connection.ConnectionException: invalid connection! org.mule.runtime.api.connection.ConnectionException: invalid connection! Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

我正在尝试构建的是一个具有 2 个流的 PoC……一个产生到主题的消息,另一个流从中消费。

在此处输入图像描述

任何有关如何设置连接器和 Heroku 环境的帮助都将受到欢迎


更新:

我发现当你在 Heroku 创建应用程序并包含 KAFKA 插件时,你会得到以下变量:

KAFKA_URL:组成集群的 Kafka 代理的 SSL URL 的逗号分隔列表。例子:

kafka+ssl://ec2-3-*****-100.compute-1.amazonaws.com:9096,kafka+ssl://ec2-3-******-127.compute-1 .amazonaws.com:9096

KAFKA_TRUSTED_CERT:代理的 SSL 证书(PEM 格式),用于检查您是否连接到正确的服务器。例子:

-----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1l ZjQwOWUzNy00NjhhLTRiMGEtOGVkOC0wZWYxMmRhYjkyZWEwHhcNMTkxMjEyMTUx NzU5WhcNMjkxMjEyMTUxNzU5WjAyMTAwLgYDVQQDDCdjYS1lZjQwOWUzNy00Njhh

KAFKA_CLIENT_CERT:根据代理对客户端进行身份验证所需的客户端证书(PEM 格式)。例子:

-----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1l ZjQwOWUzNy00NjhhLTRiMGEtOGVkOC0wZWYxMmRhYjkyZWEwHhcNMjAwMTE1MTU1 MjU2WhcNMzAwMTE1MTU1MjU2WjAZMRcwFQYDVQQDDA51NnZtYWVzM2cwZnMyZjCC

KAFKA_CLIENT_CERT_KEY:根据代理对客户端进行身份验证所需的客户端证书密钥(PEM 格式)。Kafka 集群需要使用提供的客户端证书进行身份验证。任何不使用客户端证书的请求都将被拒绝。例子:

-----开始 RSA 私钥----- MIIEpAIBAAKCAQEAmmu+j9DulVnqwIOt02++6Ehw9Mg7KaocdIQjODVtVipC5AyK iaHsdNVh9bgJQAJSfstIV/6O1mCLmjUS/YmyIEEgmBvATFxcldPGBGcpPVSV2R3Q

我假设我需要将它们“转换”为连接器 SSL 配置的 KeyStore 和 TrustStore .. 任何人都可以确认吗?因为对于我从文档中看到的内容,需要 SSL


另一个更新:

我下载了上面描述的证书并使用“密钥资源管理器”工具创建了一个 keystore.jks 并将 KAFKA_CLIENT_CERT 和 KAFKA_CLIENT_CERT_KEY 导入其中,然后我创建了一个 truststore.jsk 并在那里导入了文件 KAFKA_TRUSTED_CERT。在这两种情况下,我都设置了基本密码……看起来不错,但出现错误:

org.mule.runtime.api.connection.ConnectionException: invalid connection!
  org.mule.runtime.api.connection.ConnectionException: invalid connection!
  Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
  Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
    at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1521)
    at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:528)
    at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1197)
    at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1165)
    at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
    at org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:448)
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:313)
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1196)
    at java.lang.Thread.run(Thread.java:748)
  Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
    at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
    at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1709)
    at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:318)
    at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:310)
    at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1639)
    at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:223)
    at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1037)
    at sun.security.ssl.Handshaker$1.run(Handshaker.java:970)
    at sun.security.ssl.Handshaker$1.run(Handshaker.java:967)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1459)
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
    ... 7 more
  Caused by: java.security.cert.CertificateException: No name matching ec2-3-220-121-33.compute-1.amazonaws.com found
    at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:231)
    at sun.security.util.HostnameChecker.match(HostnameChecker.java:96)
    at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
    at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436)
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252)
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
    at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1626)
    ... 16 more

可能与以下原因有关:

java.security.cert.CertificateException:找不到与 ec2-3-220-121-33.compute-1.amazonaws.com 匹配的名称

4

1 回答 1

1

我让它工作。

使用这些命令创建 JKS 文件(需要 HEROKU CLI)

client_key=`heroku config:get KAFKA_CLIENT_CERT_KEY --app <SET_HEROKU_APP_NAME_HERE>`
client_cert=`heroku config:get KAFKA_CLIENT_CERT --app <SET_HEROKU_APP_NAME_HERE>`
trusted_cert=`heroku config:get KAFKA_TRUSTED_CERT --app <SET_HEROKU_APP_NAME_HERE>`
# Write config vars to files.
echo "$client_key" >> keystore.pem
echo -n "$client_cert" >> keystore.pem
echo -n "$trusted_cert" > truststore.pem
# Set passwords
TRUSTSTORE_PASSWORD=<SET_PASSWORD_HERE>
KEYSTORE_PASSWORD=<SET_PASSWORD_HERE>
echo $TRUSTSTORE_PASSWORD
echo $KEYSTORE_PASSWORD
# Import cert.
keytool -importcert -file truststore.pem -keystore kafka.client.truststore.jks -deststorepass $TRUSTSTORE_PASSWORD -noprompt
# Create PKCS12 file.
openssl pkcs12 -export -in keystore.pem -out keystore.pkcs12 -password pass:$KEYSTORE_PASSWORD
# Create jks files.
keytool -importkeystore -srcstoretype PKCS12 \
    -destkeystore kafka.client.keystore.jks -deststorepass $KEYSTORE_PASSWORD \
    -srckeystore keystore.pkcs12 -srcstorepass $KEYSTORE_PASSWORD

然后使用 SSL 选项配置连接器并添加以下 INLINE:

在此处输入图像描述

于 2020-01-16T16:08:46.357 回答