我已经成功地使用 Web Tracker、Scala Stream Kafka Collector、Stream Enrich、Druid 和 Metabase 在 Docker 中实现了端到端的本地实现。
在使用 Helm 在 Kubernetes 中构建工作暂存环境之前,我想尝试将 Scala Stream Kafka Collector 连接到我们的 Kafka Confluent Cloud 帐户。但是,我遇到了 SASL 身份验证问题。关于这个主题的文档非常稀少,只是将我们指向 Kafka 文档。这是我的config.hocon
配置 -
sink {
enabled = kafka
brokers = "our-domain.confluent.cloud:9092"
retries = 0
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" = brokers
# "buffer.memory" = buffer.byteLimit
# "linger.ms" = buffer.timeLimit
producerConf {
"sasl.jaas.config" = "org.apache.kafka.common.security.plain.PlainLoginModule required username='1234567890' password='our-confluent-api-secret';"
"security.protocol" = "SASL_SSL"
"sasl.mechanisms" = "PLAIN"
}
}
但是,当容器启动时,输出中的配置不匹配 -
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SASL_SSL
此外,我在控制台中收到以下错误 -
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:431)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:299)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink.createProducer(KafkaSink.scala:58)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KafkaSink.<init>(KafkaSink.scala:34)
at com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector$.main(KafkaCollector.scala:29)
at com.snowplowanalytics.snowplow.collectors.scalastream.KafkaCollector.main(KafkaCollector.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:439)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:420)
... 5 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:104)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
... 10 more
在搜索常用渠道寻求帮助时,建议此错误是由于 JAAS 配置对生产者不可见造成的,因此我非常确信这只是我这边的配置问题。除非我遗漏了什么,否则我希望生产者可以使用 SASL 身份验证,因为配置表明配置选项可用。
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
有没有人有这个问题的经验?