我有以下数据框架构
root
|-- sentence: string (nullable = true)
|-- category: string (nullable = true)
我正在将上述数据框成功写入 kafka 主题(s3.topic)。接下来我想从 kafka 读取 s3.topic 并将数据存储在 s3 存储桶中。为此,我正在使用 kafka 连接器。我在 connect-distributed.properties 文件中进行了以下配置更改
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
以下是我用来写入 s3 存储桶的 curl 命令
curl -X POST \
localhost:8084/connectors \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{
"name": "S3SinkConnector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "<region>",
"flush.size": "1000",
"topics": "s3.topic",
"tasks.max": "1",
"aws.secret.access.key": "<key>",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"aws.access.key.id": "<keyId>",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "<s3_bucket_name>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
我收到以下错误
错误 WorkerSinkTask{id=S3SinkConnector904-0} 在偏移量 0 和时间戳 1617585788233 的主题“emoji.analysis009”分区 0 中转换消息值时出错:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。(org.apache.kafka.connect.runtime.WorkerSinkTask:547) org.apache.kafka.connect.errors.DataException:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。在 org.apache.kafka 的 org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370)。
如何使用上面显示的 printschema 将我的主题数据写入 s3 存储桶?
更新
以下是写入 kafka 主题的数据帧的 printSchema() 输出
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)