3

我有1000 files,每个文件的大小为1GB. 我需要在所有这些中找到一个字符串,1000 files以及哪些文件包含那个特定的字符串。我正在使用 Hadoop 文件系统,所有这些1000 files都在 Hadoop 文件系统中。

所有的1000 files都在真实文件夹下,所以如果我在下面这样做,我将获得所有1000 files. 我需要在真实文件夹下找到哪些文件包含特定的字符串hello 。

bash-3.00$ hadoop fs -ls /technology/dps/real

这是我在 hdfs 中的数据结构-

row format delimited 
fields terminated by '\29'
collection items terminated by ','
map keys terminated by ':'
stored as textfile

如何编写 MapReduce 作业来解决这个特定问题,以便我可以找到哪些文件包含特定字符串?任何简单的例子都会对我有很大帮助。

更新:-

通过在 Unix 中使用 grep 我可以解决上述问题场景,但它非常非常慢并且需要大量时间才能获得实际输出 -

hadoop fs -ls /technology/dps/real | awk '{print $8}' | while read f; do hadoop fs -cat $f | grep cec7051a1380a47a4497a107fecb84c1 >/dev/null && echo $f; done

所以这就是我正在寻找一些 MapReduce 工作来解决这类问题的原因......

4

3 回答 3

4

听起来您正在寻找一个类似 grep 的程序,该程序很容易使用Hadoop Streaming实现(Hadoop Java API 也可以使用):

首先,编写一个映射器,如果正在处理的行包含您的搜索字符串,则输出正在处理的文件的名称。我使用了 Python,但任何语言都可以使用:

#!/usr/bin/env python
import os
import sys

SEARCH_STRING = os.environ["SEARCH_STRING"]

for line in sys.stdin:
    if SEARCH_STRING in line.split():
        print os.environ["map_input_file"]

此代码从SEARCH_STRING环境变量中读取搜索字符串。在这里,我拆分输入行并检查搜索字符串是否与任何拆分匹配;您可以更改它以执行子字符串搜索或使用正则表达式来检查匹配项。

接下来,使用此映射器运行 ​​Hadoop 流式作业,不使用减速器:

$ bin/hadoop jar contrib/streaming/hadoop-streaming-*.jar \
    -D mapred.reduce.tasks=0
    -input hdfs:///data \
    -mapper search.py \
    -file search.py \
    -output /search_results \
    -cmdenv SEARCH_STRING="Apache"

输出将分几个部分编写;要获取匹配列表,您可以简单地对文件进行分类(只要它们不太大):

$ bin/hadoop fs -cat /search_results/part-*
hdfs://localhost/data/CHANGES.txt
hdfs://localhost/data/CHANGES.txt
hdfs://localhost/data/ivy.xml   
hdfs://localhost/data/README.txt
... 
于 2012-07-31T06:24:36.977 回答
1

要获取您当前正在处理的文件名,请执行以下操作:

((FileSplit) context.getInputSplit()).getPath().getName() 

当您按记录搜索文件记录时,当您看到 时hello,发出上述路径(可能是行或其他任何内容)。

将 reducer 的数量设置为 0,它们在这里没有做任何事情。


“行格式分隔”是否意味着行由换行符分隔?在这种情况下TextInputFormatLineRecordReader在这里工作正常。

于 2012-07-31T06:08:56.410 回答
0

你可以尝试这样的事情,但我不确定这是否是一种有效的方法。让我知道它是否有效 - 我没有测试过它或任何东西。

您可以像这样使用它:java SearchFiles /technology/dps/real hello确保您从适当的目录运行它。

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Scanner;

public class SearchFiles {

    public static void main(String[] args) throws IOException {
        if (args.length < 2) {
            System.err.println("Usage: [search-dir] [search-string]");
            return;
        }
        File searchDir = new File(args[0]);
        String searchString = args[1];
        ArrayList<File> matches = checkFiles(searchDir.listFiles(), searchString, new ArrayList<File>());
        System.out.println("These files contain '" + searchString + "':");
        for (File file : matches) {
            System.out.println(file.getPath());
        }
    }

    private static ArrayList<File> checkFiles(File[] files, String search, ArrayList<File> acc) throws IOException {
        for (File file : files) {
            if (file.isDirectory()) {
                checkFiles(file.listFiles(), search, acc);
            } else {
                if (fileContainsString(file, search)) {
                    acc.add(file);
                }
            }
        }
        return acc;
    }

    private static boolean fileContainsString(File file, String search) throws IOException {
        BufferedReader in = new BufferedReader(new FileReader(file));
        String line;
        while ((line = in.readLine()) != null) {
            if (line.contains(search)) {
                in.close();
                return true;
            }
        }
        in.close();
        return false;
    }
}
于 2012-07-31T06:03:04.403 回答