0

我在 AWS 上运行 MSK,我想使用 AWS_MSK_IAM 身份验证来使用信息。

我的 MSK 配置正确,我可以使用 Kafka CLI 使用以下命令使用信息:

../bin/kafka-console-consumer.sh --bootstrap-server b-1.kafka.*********.***********.amazonaws.com:9098 --consumer.config client_auth.properties --topic TopicTest --from-beginning

我的 client_auth.properties 有以下信息:

# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

当我尝试使用 spark 从我的 Databricks 集群中消费时,我收到以下错误:

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: java.lang.ClassCastException: software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to kafkashaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler

这是我的集群配置: 在此处输入图像描述

我在集群中使用的库:

在此处输入图像描述

我在 Databricks 上运行的代码:

raw = (
    spark
        .readStream
        .format('kafka')
        .option('kafka.bootstrap.servers', 'b-.kafka.*********.***********.amazonaws.com:9098')
        .option('subscribe', 'TopicTest') 
        .option('startingOffsets', 'earliest')
        .option('kafka.sasl.mechanism', 'AWS_MSK_IAM')
        .option('kafka.security.protocol', 'SASL_SSL')
        .option('kafka.sasl.jaas.config', 'software.amazon.msk.auth.iam.IAMLoginModule required;')
        .option('kafka.sasl.client.callback.handler.class', 'software.amazon.msk.auth.iam.IAMClientCallbackHandler')
        .load()
)
4

0 回答 0