0

我正在尝试配置 kafka-connect 以将我的数据从 kafka 发送到 s3。我是kafka方面的新手,我试图在没有任何ssl加密的情况下实现这个流程只是为了掌握它。

kafka version : 2.12-2.2.0
kafka-connect : 4.1.1 (https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/4.1.1/archive)

server.properties文件中,我所做的唯一更改是将 设置advertised.listeners为我的 ec2 IP:

advertised.listeners=PLAINTEXT://ip:9092

卡夫卡连接属性:

# Kafka broker IP addresses to connect to
bootstrap.servers=localhost:9092

# Path to directory containing the connector jar and dependencies
plugin.path=/root/kafka_2.12-2.2.0/plugins/

# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
security.protocol=SASL_PLAINTEXT
consumer.security.protocol=SASL_PLAINTEXT

我的s3-sink.properties文件:

name=s3.sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=my_topic
s3.region=us-east-1
s3.bucket.name=my_bucket
s3.part.size=5242880
flush.size=3
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE

我正在使用以下命令启动 kafka-connect:

connect-standalone.sh kafka-connect.properties s3-sink.properties

起初我收到以下错误:

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

从其他帖子中我看到我需要创建一个 jaas 配置文件,以便我所做的:

cat config/kafka_server_jass.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="userName"
serviceName="kafka"
password="password";
};

和 :

export KAFKA_OPTS="-Djava.security.auth.login.config=/root/kafka_2.12-2.2.0/config/kafka_server_jass.conf"

现在我收到以下错误:

Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login

帮助 :)

4

1 回答 1

0

您可能还需要在您的 jaas 配置中定义principal和:keytab

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="userName" 
  serviceName="kafka"
  password="password";
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka_server.keytab"
  principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};
于 2020-04-13T10:31:43.907 回答