0

我正在编写一个程序,每天将我们所有配置单元表从特定数据库上传到 s3。然而,这个数据库包含许多年前的记录,对于完整的副本/distcp 来说太大了。

我想在 HDFS 中搜索包含 db 的整个目录,并且只获取 last_modified_date 在指定(输入)日期之后的文件

然后我会将这些匹配文件的完整 distcp 到 s3。(如果我需要将匹配文件的路径/名称复制到一个单独的文件中,然后从这个额外的文件中复制 distcp,那也可以。)

在网上看,我发现我可以使用-t标志按文件的最后修改日期对文件进行排序,所以我从这样的东西开始:hdfs dfs -ls -R -t <path_to_db>,但这还不够。它正在打印 500000 个文件,我仍然需要弄清楚如何修剪这些输入日期之前的文件......

编辑:我正在编写一个 Python 脚本,很抱歉最初没有澄清!

编辑pt2:我应该注意我需要遍历几千个,甚至几十万个文件。我已经编写了一个基本脚本来尝试解决我的问题,但是运行它需要非常长的时间。需要一种方法来加快进程....

4

2 回答 2

0

您可以使用 WebHDFS 提取完全相同的信息: https ://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html

这可能更适合与 Python 一起使用。

例子

文件/目录的状态 提交 HTTP GET 请求。

curl -i  "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS"
 The client receives a response with a FileStatus JSON object:

HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked

{
  "FileStatus":
  {
    "accessTime"      : 0,
    "blockSize"       : 0,
    "group"           : "supergroup",
    "length"          : 0,             //in bytes, zero for directories
    "modificationTime": 1320173277227,
    "owner"           : "webuser",
    "pathSuffix"      : "",
    "permission"      : "777",
    "replication"     : 0,
    "type"            : "DIRECTORY"    //enum {FILE, DIRECTORY}
  }
}

列出目录 提交 HTTP GET 请求。

curl -i  "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS"
 The client receives a response with a FileStatuses JSON object:

HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 427

{
  "FileStatuses":
  {
    "FileStatus":
    [
      {
        "accessTime"      : 1320171722771,
        "blockSize"       : 33554432,
        "group"           : "supergroup",
        "length"          : 24930,
        "modificationTime": 1320171722771,
        "owner"           : "webuser",
        "pathSuffix"      : "a.patch",
        "permission"      : "644",
        "replication"     : 1,
        "type"            : "FILE"
      },
      {
        "accessTime"      : 0,
        "blockSize"       : 0,
        "group"           : "supergroup",
        "length"          : 0,
        "modificationTime": 1320895981256,
        "owner"           : "szetszwo",
        "pathSuffix"      : "bar",
        "permission"      : "711",
        "replication"     : 0,
        "type"            : "DIRECTORY"
      },
      ...
    ]
  }
}
于 2022-01-28T20:53:52.097 回答
0

我不确定您是否使用 Java,但这里有一个可以做什么的示例: . 我为使用 lastModified 做了一些小的修改。

import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;


// For Date Conversion from long to human readable.
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class FileStatusChecker {
    public static void main (String [] args) throws Exception {
        try{
            FileSystem fs = FileSystem.get(new Configuration());
            String hdfsFilePath = "hdfs://My-NN-HA/Demos/SparkDemos/inputFile.txt";
            FileStatus[] status = fs.listStatus(new Path(hdfsFilePath));  // you need to pass in your hdfs path

            for (int i=0;i<status.length;i++){
                long lastModifiedTimeLong = status[i].lastModified();
                Date lastModifiedTimeDate = new Date(lastModifiedTimeLong);
                DateFormat df = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
                System.out.println("The file '"+ hdfsFilePath + "' was accessed last at: "+ df.format(lastModifiedTimeDate));
            }
        }catch(Exception e){
            System.out.println("File not found");
            e.printStackTrace();
        }
    }
}

它将使您能够创建文件列表并用它们做“事情”。

于 2022-01-27T18:51:35.100 回答