0

我在运行 flink 作业时遇到问题,该作业基本上是对 mysql 数据库运行查询,然后尝试创建一个必须从不同作业访问的临时视图。

public static void main(String[] args) throws Exception {
    
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    final TypeInformation<?>[] fieldTypes =
        new TypeInformation<?>[] {
          BasicTypeInfo.INT_TYPE_INFO,
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.STRING_TYPE_INFO
        };

    final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

    String selectQuery = "select * from ***";
    String driverName = "***";
    String sourceDb = "***";
    String dbUrl = "jdbc:mysql://mySqlDatabase:3306/";
    String dbPassword = "***";
    String dbUser = "***";

    JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
        JdbcInputFormat.buildJdbcInputFormat()
            .setDrivername(driverName)
            .setDBUrl(dbUrl + sourceDb)
            .setQuery(selectQuery)
            .setRowTypeInfo(rowTypeInfo)
            .setUsername(dbUser)
            .setPassword(dbPassword);

    DataStreamSource<Row> source = env.createInput(inputBuilder.finish());

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    Table customerTable =
            tableEnv.fromDataStream(source).as("id", "name", "test");

    tableEnv.createTemporaryView("***", ***Table);
    Table resultTable = tableEnv.sqlQuery(
            "SELECT * FROM ***");

    DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

    resultStream.print();
    env.execute();

我对 Flink 很陌生,我目前正在研究为所有这些提供的 API,但我实际上无法理解我做错了什么。在我看来,通过在作业结束时打印结果来测试这个过程似乎很简单,但我得到的唯一结果是这样的:

2022-02-14 12:22:57,702 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from INITIALIZING to RUNNING.
2022-02-14 12:22:57,853 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from RUNNING to FINISHED.
2022-02-14 12:22:57,853 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680).
2022-02-14 12:22:57,856 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 8a1cd3aa6a753c9253926027b1332680.

该作业的重点是创建一个临时表视图,用于通过查询该表视图来缓存将在其他 Flink 作业中使用的一些静态数据。

4

2 回答 2

1

首先,测试mysql的数据是否可以正常读取可能是可以直接打印源结果如下

DataStreamSource<Row> source = env.createInput(inputBuilder.finish());
source.print()
env.execute();
于 2022-02-14T13:17:21.187 回答
1

有关如何将 MySQL 与 Flink 一起使用的更多上下文,请参阅https://stackoverflow.com/a/71030967/2000823。作为流数据源,将 MySQL 的 write-ahead-log 作为 CDC 流使用更为常见,但有时采用的另一种方法(但 Flink 的 API 不鼓励)是使用 SELECT 查询定期轮询 MySQL。

至于你所尝试的,createInput不鼓励使用流式作业,因为这不适用于 Flink 的检查点机制。与其使用 hadoop 输入格式,不如选择一个可用的源连接器。

临时视图不保存任何数据,也不能从其他作业中访问。Flink 表或视图是元数据,描述了存储在其他地方(例如,在 mysql 或 kafka 中)的数据如何被 Flink 解释为表。您可以将视图存储在目录中,以便多个作业可以共享其定义,但底层数据将保留在外部数据存储中,并且只有视图元数据存储在目录中。

因此,在这种情况下,您编写的作业将创建一个临时视图,该视图仅对该作业可见,其他作业不可见(因为它是临时视图,而不是存储在持久目录中的持久视图)。您的作业的输出不会在日志文件中,而是转到标准输出或*.out每个任务管理器的日志目录中的文件。

于 2022-02-14T16:06:06.487 回答