我正在尝试在 AWS EMR(带有 Flink 1.4.2 的 5.15 版)上部署我的 flink 作业。但是,我无法从我的流中获得任何输出。我试图创建一个简单的工作:
object StreamingJob1 {
def main(args: Array[String]) {
val path = args(0)
val file_input_format = new TextInputFormat(
new org.apache.flink.core.fs.Path(path))
file_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
file_input_format.setNestedFileEnumeration(true)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val myStream: DataStream[String] =
env.readFile(file_input_format,
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000L)
.map(s => s.split(",").toString)
myStream.print()
// execute program
env.execute("Flink Streaming Scala")
}
}
我使用以下命令执行它:
HADOOP_CONF_DIR=/etc/hadoop/conf;flink run -m yarn-cluster -yn 4 -c my.pkg.StreamingJob1 /home/hadoop/flink-test-0.1.jar hdfs:///user/hadoop/data/
没有错误,但屏幕上除了 flink 的 INFO 日志没有输出。
我尝试输出到 Kinesis 流或 S3 文件。没有任何记录。
myStream.addSink(new BucketingSink[String](output_path))
我还尝试写入 HDFS 文件。在这种情况下,创建了一个文件,但大小 = 0。我确信输入文件已使用简单检查进行处理:
myStream.map(s => {"abc".toInt})
这产生了一个异常。
我在这里想念什么?