0

我正在尝试对 AWS MSK 集群使用 kafka 休息代理。

MSK 加密详细信息:

集群内

TLS 加密:启用

客户和经纪人之间

TLS 加密:启用

明文:未启用

我在 MSK 上创建了主题“TestTopic”,然后在与 MSK 相同的 VPC 中创建了另一个 EC2 实例,以作为 Rest 代理。以下是来自 kafka-rest.properties 的详细信息:

zookeeper.connect=z-3.msk.xxxx.xx.xxxxxx-1.amazonaws.com:2181,z-1.msk.xxxx.xx.xxxxxx-1.amazonaws.com:2181
bootstrap.servers=b-1.msk.xxxx.xx.xxxxxx-1.amazonaws.com:9096,b-2.msk.xxxx.xx.xxxxxx-1.amazonaws.com:9096
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks

我还创建了包含以下内容的 rest-jaas.properties 文件:

KafkaClient {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="username"
  password="password";
};

然后使用以下方法设置 java.security.auth.login.config:

export KAFKA_OPTS=-Djava.security.auth.login.config=/home/ec2-user/confluent-6.1.1/rest-jaas.properties

在此之后,我使用以下方法启动了 Kafka 休息代理:

./kafka-rest-start /home/ec2-user/confluent-6.1.1/etc/kafka-rest/kafka-rest.properties

但是,当我尝试通过从邮递员调用服务来在 TestTopic 上放置一个事件时: POST: http://IP_of_ec2instance:8082/topics/TestTopic 我收到 500 错误。但在 EC2 实例中,我可以看到错误:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at io.confluent.kafkarest.ProducerPool.buildNoSchemaProducer(ProducerPool.java:120)
    at io.confluent.kafkarest.ProducerPool.buildBinaryProducer(ProducerPool.java:106)
    at io.confluent.kafkarest.ProducerPool.<init>(ProducerPool.java:71)
    at io.confluent.kafkarest.ProducerPool.<init>(ProducerPool.java:60)
    at io.confluent.kafkarest.ProducerPool.<init>(ProducerPool.java:53)
    at io.confluent.kafkarest.DefaultKafkaRestContext.getProducerPool(DefaultKafkaRestContext.java:54)
    ... 64 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
    at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:141)
    at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:106)
    at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:92)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:139)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:74)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:120)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:449)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
    ... 71 more

我还可以在 ProducerConfig 值中看到 sasl.jaas.config = null 的值。

有人可以帮我解决这个问题。提前致谢!

4

1 回答 1

0

终于解决了这个问题。我在这里更新修复程序,以便对某人有益:

kafka-rest.properties 文件应包含以下文本:

zookeeper.connect=z-3.msk.xxxx.xx.xxxxxx-1.amazonaws.com:2181,z-1.msk.xxxx.xx.xxxxxx-1.amazonaws.com:2181
bootstrap.servers=b-1.msk.xxxx.xx.xxxxxx-1.amazonaws.com:9096,b-2.msk.xxxx.xx.xxxxxx-1.amazonaws.com:9096
client.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="username";
client.security.protocol=SASL_SSL
client.sasl.mechanism=SCRAM-SHA-512

既不需要创建文件 rest-jaas.properties 也不需要导出 KAFKA_OPTS。

在这些更改之后,我能够使用 scram 身份验证将消息放入 kafka 主题中。

于 2021-06-13T10:23:09.773 回答