0

我正在将一些为 Flink 1.5 版编写的遗留 Java 代码转换为 Flink 1.13.1 版。具体来说,我正在使用 Table API。我必须从 CSV 文件中读取数据,执行一些基本的 SQL,然后将结果写回文件。

对于 Flink 1.5 版本,我使用以下代码执行上述操作

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        
TableSource tableSrc = CsvTableSource.builder()
    .path("<CSV_PATH>")
    .fieldDelimiter(",")
    .field("date", Types.STRING)
    .field("month", Types.STRING)
    ...
    .build();
        
    tableEnv.registerTableSource("CatalogTable", tableSrc);

String sql = "...";
Table result = tableEnv.sqlQuery(sql);
DataSet<Row1> resultSet = tableEnv.toDataSet(result, Row1.class);
resultSet.writeAsText("<OUT_PATH>");
env.execute("Flink Table-Sql Example");

为了将上述代码转换为 Flink 1.13.1 版本,我编写了以下代码

import org.apache.flink.table.api.Table;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(settings);

final String tableDDL = "CREATE TEMPORARY TABLE CatalogTable (" +
    "date STRING, " +
    "month STRING, " +
    "..." +
    ") WITH (" +
    "'connector' = 'filesystem', " +
    "'path' = 'file:///CSV_PATH', " +
    "'format' = 'csv'" +
    ")";

tableEnv.executeSql(tableDDL);

String sql = "...";
Table result = tableEnv.sqlQuery(sql);

// DEPRECATED - BatchTableEnvironment required to convert Table to Dataset
BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
DataSet<Row1> resultSet = bTableEnv.toDataSet(result, Row1.class);

resultSet.writeAsText("<OUT_PATH>");
env.execute("Flink Table-Sql Example");

但是,BatchTableEnvironment在 Flink 1.13 版本中被标记为“已弃用”。是否有任何替代方法可以转换TableDataset或直接将 a 写入Table文件?

4

0 回答 0