1

我正在尝试从从 Kafka 收到的 JSON 数据中提取 PyFlink 中的一些嵌套字段。JSON 记录架构如下。基本上,每条记录都有一个Result对象,其中有一个名为 的对象数组data。我正在尝试value从第一个数组元素中提取字段,即data[0].

{
  'ID': 'some-id',
  'Result': {
    'data': [
      {
        'value': 65537,
        ...
        ...
      }
    ]
  }
}

我正在使用 Table API 从 Kafka 主题中读取数据并将提取的字段写入另一个主题。

源DDL如下:

source_ddl = """
    CREATE TABLE InTable (
        `ID` STRING,
        `Timestamp` TIMESTAMP(3),
        `Result` ROW(
            `data` ROW(`value` BIGINT) ARRAY),
        WATERMARK FOR `Timestamp` AS `Timestamp`
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'in-topic',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'my-group-id',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
    )
"""

对应的 sink DDL 为:

sink_ddl = """
    CREATE TABLE OutTable (
        `ID` STRING,
        `value` BIGINT
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'out-topic',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'my-group-id',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
    )
"""

value这是从数组的第一个元素中提取字段的代码片段:

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

table = t_env.from_path('InTable')
table \
    .select(
        table.ID,
        table.Result.data.at(1).value) \
    .execute_insert('OutTable') \
    .wait()

execute_insert当我执行此步骤时,我在步骤中看到以下错误。

py4j.protocol.Py4JJavaError: An error occurred while calling o57.executeInsert.
: scala.MatchError: ITEM($9.data, 1) (of class org.apache.calcite.rex.RexCall)

但是,如果我不提取嵌入的value,而是提取数组的整行,即table.Result.data.at(1)sink_ddl适当地修改,我就能够正确地获取整行。

任何想法,我错过了什么?感谢您的任何指点!

编辑:这可能是 Flink 中的一个错误,它被https://issues.apache.org/jira/browse/FLINK-22082跟踪。

4

0 回答 0