我需要从 Kafka 主题读取 JSON 序列化消息,将它们转换为 Parquet 并保留在 S3 中。
背景
官方S3-Sink-Connector支持 Parquet 输出格式,但是:
对于此连接器,您必须将 AvroConverter、ProtobufConverter 或 JsonSchemaConverter 与 ParquetFormat 一起使用。尝试使用 JsonConverter(带或不带模式)会导致 NullPointerException 和 StackOverflowException。
如果消息不是使用 JSON Schema serialization 写入的,则 JsonSchemaConverter 会抛出错误。
问题陈述
因此,我正在寻找一种方法来从最初以 JSON 格式编写的 Kafka 主题读取消息,以某种方式将它们转换为 JSON Schema 格式,然后将它们插入将以 Parquet 格式写入 S3 的 S3 连接器。
或者,鉴于主要要求(获取 Kafka 消息,将其作为 Parquet 文件放入 S3),我也愿意接受替代解决方案(-不涉及编写 JAVA 代码)。谢谢!
PS:不幸的是,改变这些 Kafka 消息最初的编写方式(例如使用带有Schema Discovery的JSON Schema 序列化)目前不是我的选择。