0

我正在尝试在 AWS EMR(带有 Flink 1.4.2 的 5.15 版)上部署我的 flink 作业。但是,我无法从我的流中获得任何输出。我试图创建一个简单的工作:

object StreamingJob1 {
    def main(args: Array[String]) {
        val path = args(0)
        val file_input_format = new TextInputFormat(
            new org.apache.flink.core.fs.Path(path))
        file_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
        file_input_format.setNestedFileEnumeration(true)

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val myStream: DataStream[String] =
            env.readFile(file_input_format,
                path,
                FileProcessingMode.PROCESS_CONTINUOUSLY,
                1000L)
                .map(s => s.split(",").toString)

        myStream.print()
        // execute program
        env.execute("Flink Streaming Scala")
    }
}

我使用以下命令执行它:

HADOOP_CONF_DIR=/etc/hadoop/conf;flink run -m yarn-cluster -yn 4 -c my.pkg.StreamingJob1 /home/hadoop/flink-test-0.1.jar hdfs:///user/hadoop/data/

没有错误,但屏幕上除了 flink 的 INFO 日志没有输出。

我尝试输出到 Kinesis 流或 S3 文件。没有任何记录。

    myStream.addSink(new BucketingSink[String](output_path))

我还尝试写入 HDFS 文件。在这种情况下,创建了一个文件,但大小 = 0。我确信输入文件已使用简单检查进行处理:

myStream.map(s => {"abc".toInt})

这产生了一个异常。

我在这里想念什么?

4

1 回答 1

0

看起来 stream.print() 不适用于 EMR。

输出到文件:使用HDFS,有时(或大部分时间)我需要等待文件更新。

输出到 Kinesis:我的流名称中有错字。我不知道为什么我没有得到那个流不存在的任何例外。但是,在更正名称后,我收到了预期的消息。

于 2018-07-20T05:16:10.947 回答