我有一个这样的 JSON 数据对象:
{
"monitorId": 865,
"deviceId": "94:54:93:49:96:13",
"data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
"state": 2,
"time": 1593687809180
}
该字段data
本身就是一个 JSON 对象字符串。我如何用 Flink 的 Table API 来表达这个模式?我尝试创建一个接收 JSON 字符串并输出解析内容的 UDF。但是,我找不到填充DataTypes.ROW
对象的方法:
t_env.connect(
Kafka()
.version("universal")
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)
.property("zookeeper.connect", PROD_ZOOKEEPER)
.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "string"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("time", DataTypes.STRING())
.field("data", DataTypes.STRING())
.field("state", DataTypes.STRING())
) \
.register_table_source(INPUT_TABLE)
t_env.connect(Kafka()
.version("universal")
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)
.property("zookeeper.connect", LOCAL_ZOOKEEPER)
.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"data": {
"type": "string"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("time", DataTypes.STRING())
.field("data", DataTypes.ROW([DataTypes.FIELD("feature1", DataTypes.STRING())]))
) \
.register_table_sink(OUTPUT_TABLE)
class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ? # <--- how do I populate the DataType.ROW with each individual value from data?
t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
result_type =
DataTypes.ROW([
DataTypes.FIELD("feature1", DataTypes.STRING())
])))
t_env.from_path(INPUT_TABLE) \
.select("monitorId, time, data_converter(data)") \
.insert_into(OUTPUT_TABLE)
t_env.execute("IU pyflink job")