0

我有一个这样的 JSON 数据对象:

{
    "monitorId": 865,
    "deviceId": "94:54:93:49:96:13",
    "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
    "state": 2,
    "time": 1593687809180
}

该字段data本身就是一个 JSON 对象字符串。我如何用 Flink 的 Table API 来表达这个模式?我尝试创建一个接收 JSON 字符串并输出解析内容的 UDF。但是,我找不到填充DataTypes.ROW对象的方法:

t_env.connect(
    Kafka()
        .version("universal")
        .topic(INPUT_TOPIC)
        .property("bootstrap.servers", PROD_KAFKA)
        .property("zookeeper.connect", PROD_ZOOKEEPER)
        .start_from_latest()
) \
    .with_format(
    Json()
        .json_schema(
        """
        {
    "type": "object",
    "properties": {
        "monitorId": {
            "type": "string"
        },
        "deviceId": {
            "type": "string"
        },
        "data": {
            "type": "string"
        },
        "state": {
            "type": "integer"
        },
        "time": {
            "type": "string"
        }
    }
}
    """
    )
) \
    .with_schema(
    Schema()
        .field("monitorId", DataTypes.STRING())
        .field("deviceId", DataTypes.STRING())
        .field("time", DataTypes.STRING())
        .field("data", DataTypes.STRING())
        .field("state", DataTypes.STRING())
) \
    .register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
              .version("universal")
              .topic(OUTPUT_TOPIC)
              .property("bootstrap.servers", LOCAL_KAFKA)
              .property("zookeeper.connect", LOCAL_ZOOKEEPER)
              .start_from_latest()
              ) \
    .with_format(
    Json()
        .json_schema(
        """
         {
       "type": "object",
       "properties": {
           "monitorId": {
               "type": "string"
           },
           "data": {
               "type": "string"
           },
           "time": {
               "type": "string"
           }   
       }
   }
        """
    )
) \
    .with_schema(
    Schema()
        .field("monitorId", DataTypes.STRING())
        .field("time", DataTypes.STRING())
        .field("data", DataTypes.ROW([DataTypes.FIELD("feature1", DataTypes.STRING())]))
) \
    .register_table_sink(OUTPUT_TABLE)



class DataConverter(ScalarFunction):
    def eval(self, str_data):
        data = json.loads(str_data)
        return ? # <--- how do I populate the DataType.ROW with each individual value from data?

t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
                                              result_type =
                                              DataTypes.ROW([
                                                  DataTypes.FIELD("feature1", DataTypes.STRING())
                                              ])))

t_env.from_path(INPUT_TABLE) \
    .select("monitorId, time, data_converter(data)") \
    .insert_into(OUTPUT_TABLE)

t_env.execute("IU pyflink job")


4

1 回答 1

0

如果你想要 Python UDF 的结果类型是DataTypes.Row,你可以使用 Python 的 ClassRow来包装它Row。您可以使用以下代码来导入它:from pyflink.table.types import Row

于 2020-07-10T09:47:20.930 回答