1

以 JSON 格式从 Kafka 生产/消费。使用以下属性以 JSON 格式保存到 HDFS:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

制片人:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
      --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json"

消费者 :

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties

问题一:

key.converter.schemas.enable=true

value.converter.schemas.enable=true

获得例外:

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)

问题 2:

启用以上两个属性不会引发任何问题,但不会通过 hdfs 写入数据。

任何建议将不胜感激。

谢谢

4

2 回答 2

2

转换器是指如何将数据从 Kafka 主题转换为由连接器解释并写入 HDFS。HDFS 连接器仅支持开箱即用的 avro 或 parquet 写入 HDFS。您可以在此处找到有关如何将格式扩展为 JSON 的信息。如果您进行这样的扩展,我鼓励您将其贡献给连接器的开源项目。

于 2016-11-24T02:33:50.803 回答
1

要写入HDFS的输入Json格式消息,请设置以下属性

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
于 2017-07-11T07:57:29.407 回答