0

使用 Mule 4 和 Confluent Cloud 设置(非常简单的)POC: 在此处输入图像描述

我无法使用最新版本的 Mule 4 Apache Kafka 连接器 (4.5.0) 建立成功连接。如果我将它降级到 3.0.7 并使用相同的配置,它就可以正常工作。为什么是这样?

工作的 3.0.7 配置(对于基本生产者)如下所示:

<kafka:kafka-producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="2ba6262d-2ff8-4282-910e-5c9e3d347d50" >
    <kafka:basic-kafka-producer-connection bootstrapServers="${kafka.bootstrapserver}" >
        <kafka:additional-properties >
            <kafka:additional-property key="sasl.jaas.config" value="org.apache.kafka.common.security.plain.PlainLoginModule required username='${kafka.key}' password='${kafka.secret}';" />
            <kafka:additional-property key="ssl.endpoint.identification.algorithm" value="https" />
            <kafka:additional-property key="security.protocol" value="SASL_SSL" />
            <kafka:additional-property key="sasl.mechanism" value="PLAIN" />
            <kafka:additional-property key="serviceName" value="kafka" />
        </kafka:additional-properties>
    </kafka:basic-kafka-producer-connection>
</kafka:kafka-producer-config>

失败的 4.5.0 配置(也适用于基本生产者)如下所示:

<kafka:producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="7aa22dcc-7895-4254-ba51-e8bc5e2e9c2e" >
    <kafka:producer-sasl-plain-connection username="${kafka.key}" password="${kafka.secret}" endpointIdentificationAlgorithm="https">
        <kafka:bootstrap-servers >
            <kafka:bootstrap-server value="${kafka.bootstrapserver}" />
        </kafka:bootstrap-servers>
    </kafka:producer-sasl-plain-connection>
</kafka:producer-config>

你可以看到他们俩:

  • 使用 SASL 纯文本连接
  • 有HTTPS的SSL端点识别算法
  • 指定相同的引导服务器、API 密钥和机密

HTTP listener除了 an和 a之外,流程中几乎没有其他内容Set Payload

使用早期连接器版本发送的消息可以正常到达 Confluent Cloud 主题,但是使用应用程序无法启动并递归打印错误,例如:

org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Found least loaded connecting node pkc-4vndj.australia-southeast1.gcp.confluent.cloud:9092 (id: -1 rack: null)
org.mule.runtime.module.extension.internal.runtime.config.LifecycleAwareConfigurationInstance.testConnectivity:179 @23ad5b4f] [processor: ; event: ] org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Node -1 disconnected.
org.mule.runtime.module.extension.internal.runtime.config.LifecycleAwareConfigurationInstance.testConnectivity:179 @23ad5b4f] [processor: ; event: ] org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Connection to node -1 (xxxx.australia-southeast1.gcp.confluent.cloud/35.244.90.132:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Bootstrap broker pkc-4vndj.australia-southeast1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Cancelled request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-connectivity-1, correlationId=17) due to node -1 being disconnected
org.apache.kafka.common.network.Selector: [Producer clientId=producer-1] Connection with xxxxx.australia-southeast1.gcp.confluent.cloud/35.244.90.132 disconnected

和堆栈跟踪End of File Exception

java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:120) ~[kafka-clients-2.7.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:470) ~[kafka-clients-2.7.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:560) ~[kafka-clients-2.7.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:248) ~[kafka-clients-2.7.0.jar:?]
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:176) ~[kafka-clients-2.7.0.jar:?]

其中(查看 Apache 源代码)看起来像一个零字节消息响应。

4

1 回答 1

0

4.5.0 版可能没有构建和实例化org.apache.kafka.common.security.plain.PlainLoginModuleConfluent Cloud 验证请求所需的属性。

于 2021-03-04T14:30:37.563 回答