0

我正在构建一个 flink 应用程序,它从 kafka 主题中读取数据,应用一些转换并写入 Iceberg 表。

我从 kafka 主题(在 json 中)读取数据并使用 circe 将其解码为 scala 案例类,其中包含 scala 选项值。数据流上的所有转换都可以正常工作。

案例类如下所示

Event(app_name: Option[String], service_name: Option[String], ......)

但是当我尝试将流转换为表以写入冰山表时,由于案例类,列将转换为原始类型,如下所示。

table.printSchema()

service_name RAW('scala.Option', '...'),
conversion_id RAW('scala.Option', '...'),
......

并且表写入失败如下。

Query schema: [app_name: RAW('scala.Option', '...'), .........
Sink schema: [app_name: STRING, .......

flink table api 是否支持带有选项值的 scala 案例类? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#special-types

我发现本文档中的数据流支持它。

有没有办法在 Table API 中做到这一点。

在此先感谢您的帮助..

4

1 回答 1

0

Table API 的类型系统比 DataStream API 的类型系统更严格。不受支持的类会立即被视为黑盒类型RAW。这允许对象仍然通过 API,但可能不是每个连接器都支持它。

从例外情况来看,您似乎使用 声明了接收器表app_name: STRING,所以我想您可以使用字符串表示形式。如果是这种情况,我建议实现一个用户定义的函数来执行到字符串的转换。

于 2022-02-21T09:24:29.333 回答