我正在尝试配置 kafka-connect 以将我的数据从 kafka 发送到 s3。我是kafka方面的新手,我试图在没有任何ssl加密的情况下实现这个流程只是为了掌握它。
kafka version : 2.12-2.2.0
kafka-connect : 4.1.1 (https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/4.1.1/archive)
在server.properties
文件中,我所做的唯一更改是将 设置advertised.listeners
为我的 ec2 IP:
advertised.listeners=PLAINTEXT://ip:9092
卡夫卡连接属性:
# Kafka broker IP addresses to connect to
bootstrap.servers=localhost:9092
# Path to directory containing the connector jar and dependencies
plugin.path=/root/kafka_2.12-2.2.0/plugins/
# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
security.protocol=SASL_PLAINTEXT
consumer.security.protocol=SASL_PLAINTEXT
我的s3-sink.properties
文件:
name=s3.sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=my_topic
s3.region=us-east-1
s3.bucket.name=my_bucket
s3.part.size=5242880
flush.size=3
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
我正在使用以下命令启动 kafka-connect:
connect-standalone.sh kafka-connect.properties s3-sink.properties
起初我收到以下错误:
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
从其他帖子中我看到我需要创建一个 jaas 配置文件,以便我所做的:
cat config/kafka_server_jass.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="userName"
serviceName="kafka"
password="password";
};
和 :
export KAFKA_OPTS="-Djava.security.auth.login.config=/root/kafka_2.12-2.2.0/config/kafka_server_jass.conf"
现在我收到以下错误:
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
帮助 :)