-2

如何实现使用 kafka 代理和用户名和密码但不使用 ssl 证书的骆驼 kafka 生产者和消费者。

我的 kafka 代理配置看起来像这样

{
    "kafka_brokers_sasl":
    "kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093",
    "kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093"
  ,
  "user": "**********",
  "password": "********"
}
4

1 回答 1

4

为了连接到 Message Hub,您至少需要 Apache Camel 2.20。

然后将以下查询参数添加到您的 URI:

  • saslMechanism=PLAIN
  • 安全协议=SASL_SSL
  • sslProtocol=TLSv1.2
  • sslEnabledProtocols=TLSv1.2
  • saslJaasConfig=org.apache.kafka.common.security.plain.PlainLoginModule 需要用户名=“用户名”密码=“密码”;

例如,对于生产者:

final String brokers = "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093";

final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";

from("direct:in")
.to("kafka:test?" 
    + "brokers=" + brokers
    + "&saslMechanism=PLAIN"  
    + "&securityProtocol=SASL_SSL"
    + "&sslProtocol=TLSv1.2"
    + "&sslEnabledProtocols=TLSv1.2" 
    + "&sslEndpointAlgorithm=HTTPS"
    + "&saslJaasConfig=" + saslJaasConfig);

和消费者:

final String brokers = "kafka01-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka02-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka03-orgName.services.orgsj001.us-west.bluemix.net:9093,kafka04-orgName.services.orgsj001.us-west.bluemix.net:9093";

final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";

from("kafka:test?" 
    + "brokers=" + brokers
    + "&saslMechanism=PLAIN"  
    + "&securityProtocol=SASL_SSL"
    + "&sslProtocol=TLSv1.2"
    + "&sslEnabledProtocols=TLSv1.2" 
    + "&sslEndpointAlgorithm=HTTPS"
    + "&saslJaasConfig=" + saslJaasConfig
    + "&groupId=mygroup")
.to("stream:out");
于 2018-04-30T08:54:39.327 回答