3

我正在 Apache Flink 中编写流媒体服务。我基本上是使用 org.apache.flink.table.sources.CsvTableSource 从 CSV 文件中挑选数据。以下是相同的代码:

 StreamTableEnvironment streamTableEnvironment = TableEnvironment
                .getTableEnvironment(streamExecutionEnvironment);

    CsvTableSource csvTableSource = CsvTableSource.builder().path(pathToCsvFile)
            .field("XXX0", Types.SQL_TIMESTAMP).field("XXX1", Types.INT)
            .field("XXX2", Types.DECIMAL).field("XXX3", Types.INT).field("XXX4", Types.INT)
            .field("XXX9", Types.DECIMAL).field("XXX5", Types.STRING)
            .field("XXX6", Types.STRING).field("XXX7", Types.STRING).fieldDelimiter(",").lineDelimiter("\n")
            .ignoreFirstLine().ignoreParseErrors().build();

    streamTableEnvironment.registerTableSource("metrics_table", csvTableSource);

    Table selectedMetricTable = streamTableEnvironment.sqlQuery(getSQLQuery(metricsType, metricsGroupingLevel));

    DataStream<Tuple2<Boolean, MetricsTimeSeriesData>> metricStream = streamTableEnvironment
            .toRetractStream(selectedMetricTable, MetricsTimeSeriesData.class);

但它给出以下错误:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.sources.TableSource

以下是maven依赖项:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>

我可以看到上述类的源定义,但我仍然收到此错误。请帮忙?

4

2 回答 2

1

该模块flink-table没有随 flink 二进制发行版一起提供,因此默认情况下它不提供给集群。您可以将该依赖项放入您的集群安装(在文件夹中),请参阅设置\lib的最后一部分,或者您可以将您的作业作为打包该依赖项的 uber-jar 提交,请参阅此处

于 2018-03-06T22:09:45.300 回答
0

我使用的是 Flink 1.8.0 版本,我遇到了同样的问题。我可以通过从我的系统路径指向flink-table_2.12-1.8.0.jar在pom.xml中添加以下依赖项来修复它。

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.12</artifactId>
            <version>1.8.0</version>
            <scope>system</scope>
            <systemPath>E:\flink-1.8.0-scala_2.12\opt\flink-table_2.12-1.8.0.jar</systemPath>
        </dependency>

希望它会帮助你。

于 2019-06-05T13:50:48.730 回答