我正在尝试将 json 数据转换为镶木地板,这样我就可以使用 Trino 或 presto 进行查询。示例 JSON 如下:
{"name": "success","message": "test","id": 1, "test1": {"one": 1, "two": 2, "three": "t3"}, "test2": [1,2,3], "test3": [{"a": "a"},{"a": "aa"}], "test4": [{"a": "a"},{"a": "aa"}]}
我的 Flink 代码如下:
tEnv.executeSql("create TEMPORARY table test (" +
"name string," +
"message string," +
"id int," +
// "test1 string," +
"test1 map<string,string>," +
// "test1 row (`one` int, `two` int, `three` string)," +
"test2 array< int >," +
"test3 array< map<string,string>>," +
"test4 string" +
")" +
// "ts as LOCALTIMESTAMP," +
// "WATERMARK FOR ts AS ts - INTERVAL '10' SECOND)" +
"with (" +
"'connector' = 'filesystem'," +
"'path' = 'file:///Users/successmalla/big_data/flink/src/main/resources/test.json'," +
"'format' = 'json'" +
// "'csv.ignore-parse-errors' = 'true'" +
")");
//
// tEnv.executeSql("select test1['one'] as test_one, test1 " +
//// ", test2[1], test3[2]['a']" +
//// ", * " +
// "from test ")
// .print();
//
tEnv.executeSql("create table test2 (" +
"name string," +
"message string," +
"id int," +
"test1 map<string,string>," +
// "test1 row (`one` int, `two` int, `three` string)," +
// "test2 array< int >," +
// "test3 array< map<string,string>>," +
"test4 string" +
")" +
// "ts as LOCALTIMESTAMP," +
// "WATERMARK FOR ts AS ts - INTERVAL '10' SECOND)" +
"with (" +
"'connector' = 'filesystem'," +
"'path' = 'file:///Users/successmalla/big_data/flink/src/main/resources/testresilt'," +
"'format' = 'parquet'" +
// "'csv.ignore-parse-errors' = 'true'" +
")");
tEnv.executeSql("insert into test2 " +
"select name, message, id, test1, test4 " +
"from test ");
有了这个我得到以下错误
Caused by: java.lang.UnsupportedOperationException: Unsupported type: MAP<STRING, STRING>
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:105)
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetType(ParquetSchemaConverter.java:43)
at org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertToParquetMessageType(ParquetSchemaConverter.java:37)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:72)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.<init>(ParquetRowDataBuilder.java:70)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder.getWriteSupport(ParquetRowDataBuilder.java:67)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:652)
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$FlinkParquetBuilder.createWriter(ParquetRowDataBuilder.java:135)
at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)
我可以使用地图、数组或行打印数据,但无法将这些数据保存为镶木地板。先感谢您。