我正在将一些为 Flink 1.5 版编写的遗留 Java 代码转换为 Flink 1.13.1 版。具体来说,我正在使用 Table API。我必须从 CSV 文件中读取数据,执行一些基本的 SQL,然后将结果写回文件。
对于 Flink 1.5 版本,我使用以下代码执行上述操作
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
TableSource tableSrc = CsvTableSource.builder()
.path("<CSV_PATH>")
.fieldDelimiter(",")
.field("date", Types.STRING)
.field("month", Types.STRING)
...
.build();
tableEnv.registerTableSource("CatalogTable", tableSrc);
String sql = "...";
Table result = tableEnv.sqlQuery(sql);
DataSet<Row1> resultSet = tableEnv.toDataSet(result, Row1.class);
resultSet.writeAsText("<OUT_PATH>");
env.execute("Flink Table-Sql Example");
为了将上述代码转换为 Flink 1.13.1 版本,我编写了以下代码
import org.apache.flink.table.api.Table;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(settings);
final String tableDDL = "CREATE TEMPORARY TABLE CatalogTable (" +
"date STRING, " +
"month STRING, " +
"..." +
") WITH (" +
"'connector' = 'filesystem', " +
"'path' = 'file:///CSV_PATH', " +
"'format' = 'csv'" +
")";
tableEnv.executeSql(tableDDL);
String sql = "...";
Table result = tableEnv.sqlQuery(sql);
// DEPRECATED - BatchTableEnvironment required to convert Table to Dataset
BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
DataSet<Row1> resultSet = bTableEnv.toDataSet(result, Row1.class);
resultSet.writeAsText("<OUT_PATH>");
env.execute("Flink Table-Sql Example");
但是,BatchTableEnvironment
在 Flink 1.13 版本中被标记为“已弃用”。是否有任何替代方法可以转换Table
为Dataset
或直接将 a 写入Table
文件?