0

我正在努力将hdfs集成到flink。

Scala 二进制版本:2.12,

Flink(集群)版本:1.10.1

这是HADOOP_CONF_DIR;

hdfs的配置在这里;

这个配置和HADOOP_CONF_DIR在taskmanager里也是一样的。

pom.xml;

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.8.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.10.0</version>
    </dependency>
</dependencies>

我正在尝试从 hdfs 获取镶木地板文件,我的示例代码在那里;

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

    Types.MessageTypeBuilder builder = Types.buildMessage();

    MessageType messageType = builder
            .required(INT64).named("column1")
            .required(BINARY).as(UTF8).named("column2")
            .required(INT64).named("column3")
            .required(INT64).named("column4")
            .required(BINARY).as(UTF8).named("column5")
            .required(BINARY).named("column6")
            .named("AppendTest");

    ParquetTableSource parquetTableSource = ParquetTableSource.builder()
        .path("hdfs://hdfs:8020/historic_data/data.parquet")
        .withConfiguration(hadoopConf)
        .forParquetSchema(messageType)
        .build();

    tEnv.registerTableSource("datatable", parquetTableSource);

    Table table = tEnv.sqlQuery("select * from datatable");

    DataSet<Row> tempDataSet = tEnv.toDataSet(table, Row.class);

    tempDataSet.print();

    env.execute("Job name - short desc.");

错误在这里;

Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:271)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:807)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:256)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
... 22 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
... 27 more

当您在 lib 文件夹下看到 Hadoop uber jar 时,这很奇怪

这就是我提交工作的方式;

docker exec -it jobmanager env HADOOP_CONF_DIR=/tmp/hadoopconf flink run -C file:///opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar -d /tmp/core -batch-1.0.0.jar

我也尝试使用 flink UI 推送作业,但结果是一样的。

任何帮助,将不胜感激。

谢谢。

4

1 回答 1

0

@Aykut,我不确定这会有多大帮助,但我最近一直在测试通过我从GitHub 上的 Ambari Flink 服务创建的自定义 ambari 服务交付给 HDP3 的 Flink 1.12 。当我将 Flink 安装到安装了 yarn/hdfs 的基本 ambari 集群中时,一切都与所有依赖项完美配合。一旦 yarn flink 应用程序在 yarn 集群中运行,我的应用程序测试的其余部分运行良好。参考Yarn 上的 Flink

这是我的 Flink 用户环境变量:

export HADOOP_CONF_DIR=/etc/hadoop/conf; export HADOOP_CLASSPATH=/usr/hdp/3.1.4.0-315/hadoop/conf:/usr/hdp/3.1.4.0-315/hadoop/lib/*:/usr/hdp/3.1.4.0-315/hadoop/.//*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/./:/usr/hdp/3.1.4.0-315/hadoop-hdfs/lib/*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/.//*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/lib/*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/.//*:/usr/hdp/3.1.4.0-315/hadoop-yarn/./:/usr/hdp/3.1.4.0-315/hadoop-yarn/lib/*:/usr/hdp/3.1.4.0-315/hadoop-yarn/.//*:/usr/hdp/3.1.4.0-315/tez/*:/usr/hdp/3.1.4.0-315/tez/lib/*:/usr/hdp/3.1.4.0-315/tez/conf:/usr/hdp/3.1.4.0-315/tez/conf_llap:/usr/hdp/3.1.4.0-315/tez/doc:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-2.8-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib:/usr/hdp/3.1.4.0-315/tez/man:/usr/hdp/3.1.4.0-315/tez/tez-api-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-common-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-dag-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-examples-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-history-parser-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-javadoc-tools-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-job-analyzer-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-mapreduce-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-protobuf-history-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-internals-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-library-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-tests-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-cache-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-acls-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-fs-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/ui:/usr/hdp/3.1.4.0-315/tez/lib/async-http-client-1.9.40.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-cli-1.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-codec-1.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections4-4.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-io-2.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-lang-2.6.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/gcs-connector-1.9.10.3.1.4.0-315-shaded.jar:/usr/hdp/3.1.4.0-315/tez/lib/guava-28.0-jre.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-aws-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-datalake-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-hdfs-client-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-common-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-core-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-client-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-json-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jettison-1.3.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/metrics-core-3.1.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/RoaringBitmap-0.4.9.jar:/usr/hdp/3.1.4.0-315/tez/lib/servlet-api-2.5.jar:/usr/hdp/3.1.4.0-315/tez/lib/slf4j-api-1.7.10.jar:/usr/hdp/3.1.4.0-315/tez/lib/tez.tar.gz;

这是我的执行命令:

/opt/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/bin/flink run javaProgram.jar

这是示例 java 代码(通过 Flink 将 s3 文件摄取到 hdfs):

package org.myorg.quickstart;

import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class S3FlinkIngest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("s3a://s3Bucket/input.xml");
        dataStream.writeAsText("hdfs://hadoopHost:8020/user/flink/ingest.xml", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);;
        streamExecutionEnvironment.execute("S3 Flink Ingest");
    } }
于 2020-06-23T21:07:47.807 回答