0

我们有一个来自数据库表的文件路径列表,其中包含创建时间的时间戳。试图弄清楚我们如何使用 db 中的文件路径列表仅将那些文件从 nfs 转发到 kafka sink。

现在,我正在使用带有文件夹根目录的 ContinuousFileMonitoringFunction 的自定义版本,该文件夹将包含 DB 将显示的所有文件。此操作非常缓慢,因为要遍历文件夹以收集有关更新文件的信息,因为该文件夹太大而只有几 TB 的数据。

Table orders = tableEnv.from("Customers");
Table result = orders.where($("b").isEqual("****"));

DataSet<String> ds  = result.toDataSet();

ds 包含所有应该发送到 kafka 的文件路径。

以下是我计划实施的想法。但是考虑到 flink 并行性、flink 库支持等,有没有更有效的方法?

public class FileContentMap extends RichFlatMapFunction<String, String> {

      

    @Override
    public void flatMap(String input, Collector<String> out) throws Exception {

       
       
        // get the file path
        String filePath = input;

        String fileContent = readFile(input);

    out.collect(fileCOntent);

       
    }

    @Override
    public void open(Configuration config) {
       
    }
}

DataSet<String> contectDataSet = ds.map(new FileCOntentMap());

contectDataSet.addSink(kafkaProducer);
4

1 回答 1

0

你的方法对我来说似乎很好。也许更有效的方法是创建一个RichParallelSourceFunction,在open()方法中调用数据库以获取已更新的文件列表,然后构建该特定源子任务的文件的内存列表(类似filePath.hashCode() % numSubTasks == mySubTask) 的东西应该发出以由您的FileContentMap.

于 2020-08-18T20:15:49.500 回答