我正在构建一个 flink 应用程序,它从 kafka 主题中读取数据,应用一些转换并写入 Iceberg 表。
我从 kafka 主题(在 json 中)读取数据并使用 circe 将其解码为 scala 案例类,其中包含 scala 选项值。数据流上的所有转换都可以正常工作。
案例类如下所示
Event(app_name: Option[String], service_name: Option[String], ......)
但是当我尝试将流转换为表以写入冰山表时,由于案例类,列将转换为原始类型,如下所示。
table.printSchema()
service_name RAW('scala.Option', '...'),
conversion_id RAW('scala.Option', '...'),
......
并且表写入失败如下。
Query schema: [app_name: RAW('scala.Option', '...'), .........
Sink schema: [app_name: STRING, .......
flink table api 是否支持带有选项值的 scala 案例类? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#special-types
我发现本文档中的数据流支持它。
有没有办法在 Table API 中做到这一点。
在此先感谢您的帮助..