0

我需要从特定路径连续读取文件。

意味着 flink 作业应该不断地轮询指定的位置,并以一定的时间间隔读取将到达该位置的文件。

示例:我在 Windows 机器上的位置是 C:/inputfiles 在下午 2:00 获取文件 file_1.txt,在下午 2:30 获取文件_2.txt,在下午 3:00 获取 file_3.txt。

我用下面的代码对其进行了试验。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

public class ContinuousFileProcessingTest {

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10);
    String localFsURI = "D:\\FLink\\2021_01_01\\";
    TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
    format.setFilesFilter(FilePathFilter.createDefaultFilter());
    DataStream<String> inputStream =
            env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
    SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
    soso.print();
    soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
    env.execute("read and write");
}
}

现在为了在 flink 集群上进行测试,我使用 flink 的 1.9.2 版本启动了 flink 集群,并且我能够实现每隔一段时间连续读取文件的目标。

注意:Flink 的 1.9.2 版本可以在 windows 机器上启动集群。

但是现在我必须将 flink 的版本从 1.9.2 升级到 1.12 。我们使用 docker 在 1.12 上启动集群(与 1.9.2 不同)。

与 Windows 路径不同,我根据 docker 位置更改了文件位置,但上面的程序没有在那里运行。

此外:访问文件不是问题。意味着如果我在开始作业之前放置文件,那么该作业会正确读取这些文件,但如果我在运行时添加任何新文件,则它不会读取这些新添加的文件。

需要帮助才能找到解决方案。

提前致谢。

4

2 回答 2

1

尝试将 directoryScanInterval 从示例代码减少到 Duration.ofSeconds(50).toMillis() 并检查 StreamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) 模式。

于 2021-08-30T17:55:20.527 回答
0

对于从https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/RuntimeExecutionMode.html引用的 RuntimeExecutionMode

工作代码如下:

公共类 ContinuousFileProcessingTest { 私有静态最终 Logger 日志 = LoggerFactory.getLogger(ReadSpecificFilesFlinkBatch.class);

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10);
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    String localFsURI = "file:///usr/test";
    // create the monitoring source along with the necessary readers.
    TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
    log.info("format : " + format.toString());
    format.setFilesFilter(FilePathFilter.createDefaultFilter());
    log.info("setFilesFilter : " + FilePathFilter.createDefaultFilter().toString());
    log.info("getFilesFilter : " + format.getFilePath().toString());
    DataStream<String> inputStream =
            env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY,  Duration.ofSeconds(50).toMillis());
    SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
    soso.writeAsText("file:///usr/test/completed.txt", FileSystem.WriteMode.OVERWRITE);
    env.execute("read and write");
}        

}

此代码适用于具有 Flink 1.12 和容器文件路径为 file:///usr/test 的 docker 桌面。注意保持并行度至少为 2,以便可以并行处理文件。

于 2021-08-31T08:30:42.503 回答