1

我有以下数据框架构

root
 |-- sentence: string (nullable = true)
 |-- category: string (nullable = true)

我正在将上述数据框成功写入 kafka 主题(s3.topic)。接下来我想从 kafka 读取 s3.topic 并将数据存储在 s3 存储桶中。为此,我正在使用 kafka 连接器。我在 connect-distributed.properties 文件中进行了以下配置更改

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

以下是我用来写入 s3 存储桶的 curl 命令

curl -X POST \
   localhost:8084/connectors \
  -H 'Content-Type: application/json' \
  -H 'Accept: application/json' \
  -d '{
  "name": "S3SinkConnector",
  "config": {
   "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "s3.region": "<region>",
  "flush.size": "1000",
  "topics": "s3.topic",
  "tasks.max": "1",
  "aws.secret.access.key": "<key>",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "aws.access.key.id": "<keyId>",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "s3.bucket.name": "<s3_bucket_name>",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}'

我收到以下错误

错误 WorkerSinkTask{id=S3SinkConnector904-0} 在偏移量 0 和时间戳 1617585788233 的主题“emoji.analysis009”分区 0 中转换消息值时出错:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。(org.apache.kafka.connect.runtime.WorkerSinkTask:547) org.apache.kafka.connect.errors.DataException:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。在 org.apache.kafka 的 org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370)。

如何使用上面显示的 printschema 将我的主题数据写入 s3 存储桶?

更新

以下是写入 kafka 主题的数据帧的 printSchema() 输出

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
4

0 回答 0