我在运行 flink 作业时遇到问题,该作业基本上是对 mysql 数据库运行查询,然后尝试创建一个必须从不同作业访问的临时视图。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final TypeInformation<?>[] fieldTypes =
new TypeInformation<?>[] {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
String selectQuery = "select * from ***";
String driverName = "***";
String sourceDb = "***";
String dbUrl = "jdbc:mysql://mySqlDatabase:3306/";
String dbPassword = "***";
String dbUser = "***";
JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(driverName)
.setDBUrl(dbUrl + sourceDb)
.setQuery(selectQuery)
.setRowTypeInfo(rowTypeInfo)
.setUsername(dbUser)
.setPassword(dbPassword);
DataStreamSource<Row> source = env.createInput(inputBuilder.finish());
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table customerTable =
tableEnv.fromDataStream(source).as("id", "name", "test");
tableEnv.createTemporaryView("***", ***Table);
Table resultTable = tableEnv.sqlQuery(
"SELECT * FROM ***");
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
resultStream.print();
env.execute();
我对 Flink 很陌生,我目前正在研究为所有这些提供的 API,但我实际上无法理解我做错了什么。在我看来,通过在作业结束时打印结果来测试这个过程似乎很简单,但我得到的唯一结果是这样的:
2022-02-14 12:22:57,702 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from INITIALIZING to RUNNING.
2022-02-14 12:22:57,853 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from RUNNING to FINISHED.
2022-02-14 12:22:57,853 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680).
2022-02-14 12:22:57,856 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 8a1cd3aa6a753c9253926027b1332680.
该作业的重点是创建一个临时表视图,用于通过查询该表视图来缓存将在其他 Flink 作业中使用的一些静态数据。