我需要从特定路径连续读取文件。
意味着 flink 作业应该不断地轮询指定的位置,并以一定的时间间隔读取将到达该位置的文件。
示例:我在 Windows 机器上的位置是 C:/inputfiles 在下午 2:00 获取文件 file_1.txt,在下午 2:30 获取文件_2.txt,在下午 3:00 获取 file_3.txt。
我用下面的代码对其进行了试验。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;
public class ContinuousFileProcessingTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10);
String localFsURI = "D:\\FLink\\2021_01_01\\";
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream<String> inputStream =
env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
soso.print();
soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
env.execute("read and write");
}
}
现在为了在 flink 集群上进行测试,我使用 flink 的 1.9.2 版本启动了 flink 集群,并且我能够实现每隔一段时间连续读取文件的目标。
注意:Flink 的 1.9.2 版本可以在 windows 机器上启动集群。
但是现在我必须将 flink 的版本从 1.9.2 升级到 1.12 。我们使用 docker 在 1.12 上启动集群(与 1.9.2 不同)。
与 Windows 路径不同,我根据 docker 位置更改了文件位置,但上面的程序没有在那里运行。
此外:访问文件不是问题。意味着如果我在开始作业之前放置文件,那么该作业会正确读取这些文件,但如果我在运行时添加任何新文件,则它不会读取这些新添加的文件。
需要帮助才能找到解决方案。
提前致谢。