我正在尝试从从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每条记录都有一个Result
对象,其中有一个名为 的对象数组data
。我正在尝试value
从第一个数组元素中提取字段,即data[0]
.
{
'ID': 'some-id',
'Result': {
'data': [
{
'value': 65537,
...
...
}
]
}
}
我正在使用 Table API 从 Kafka 主题中读取数据并将提取的字段写入另一个主题。
源DDL如下:
source_ddl = """
CREATE TABLE InTable (
`ID` STRING,
`Timestamp` TIMESTAMP(3),
`Result` ROW(
`data` ROW(`value` BIGINT) ARRAY),
WATERMARK FOR `Timestamp` AS `Timestamp`
) WITH (
'connector' = 'kafka',
'topic' = 'in-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
对应的 sink DDL 为:
sink_ddl = """
CREATE TABLE OutTable (
`ID` STRING,
`value` BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'out-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'my-group-id',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
value
这是从数组的第一个元素中提取字段的代码片段:
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
table = t_env.from_path('InTable')
table \
.select(
table.ID,
table.Result.data.at(1).value) \
.execute_insert('OutTable') \
.wait()
execute_insert
当我执行此步骤时,我在步骤中看到以下错误。
py4j.protocol.Py4JJavaError: An error occurred while calling o57.executeInsert.
: scala.MatchError: ITEM($9.data, 1) (of class org.apache.calcite.rex.RexCall)
但是,如果我不提取嵌入的value
,而是提取数组的整行,即table.Result.data.at(1)
并sink_ddl
适当地修改,我就能够正确地获取整行。
任何想法,我错过了什么?感谢您的任何指点!
编辑:这可能是 Flink 中的一个错误,它被https://issues.apache.org/jira/browse/FLINK-22082跟踪。