2

我用 Java 实现了一个 wordcount 程序。基本上,该程序需要一个大文件(在我的测试中,我使用了一个仅包含数字的 10 gb 数据文件),并计算每个“单词”出现的次数 - 在这种情况下,可能会出现一个数字(例如 23723 243 次)。

下面是我的实现。我寻求改进它,主要考虑性能,但也考虑其他一些事情,我正在寻找一些指导。以下是我希望纠正的几个问题:

  1. 目前,该程序是线程化的并且可以正常工作。但是,我所做的是将一块内存传递(500MB/NUM_THREADS)给每个线程,然后每个线程继续进行字数计数。这里的问题是我让主线程等待所有线程完成,然后再将更多数据传递给每个线程。这不是什么大问题,但是有一段时间,几个线程会等待一段时间,什么也不做。我相信某种工作池或执行器服务可以解决这个问题(我还没有学习过这个的语法)。

  2. 该程序仅适用于包含整数的文件。那是个问题。我为此苦苦挣扎,因为我不知道如何在不创建大量未使用变量的情况下迭代数据(使用 String 甚至 StringBuilder 的性能都很糟糕)。目前,我使用我知道输入是整数的事实,并且只是将临时变量存储为int,所以那里没有内存问题。我希望能够使用某种分隔符,无论该分隔符是空格还是几个字符。

  3. 我正在使用全局 ConcurrentHashMap 来记录键值对。例如,如果一个线程找到一个数字“24624”,它会在地图中搜索该数字。如果存在,它将将该键的值增加一。末尾键的值表示该键的出现次数。那么这是正确的设计吗?我会通过给每个线程自己的哈希图,然后在最后合并它们来提高性能吗?

  4. 有没有其他方法可以在不使用 RandomAccessMemory 类的情况下通过偏移量查找文件?这个类只会读入一个字节数组,然后我必须对其进行转换。我没有为这种转换计时,但也许使用其他东西可能会更快。

我也对其他可能性持开放态度,这就是我想到的。

注意:拆分文件不是我想探索的选项,因为我可能将其部署在我不应该创建自己的文件的服务器上,但如果它真的会提高性能,我可能会听。

其他注意事项:我是 Java 线程的新手,也是 StackOverflow 的新手。要温柔。

    public class BigCount2 {
        public static void main(String[] args) throws IOException, InterruptedException {
            int num, counter;
            long i, j;
            String delimiterString = " ";
            ArrayList<Character> delim = new ArrayList<Character>();
            for (char c : delimiterString.toCharArray()) {
                delim.add(c);
            }
            int counter2 = 0;
            num = Integer.parseInt(args[0]);
            int bytesToRead = 1024 * 1024 * 1024 / 2; //500 MB, size of loop
            int remainder = bytesToRead % num;
            int k = 0;
            bytesToRead = bytesToRead - remainder;
            int byr = bytesToRead / num;
            String filepath = "C:/Users/Daniel/Desktop/int-dataset-10g.dat";
            RandomAccessFile file = new RandomAccessFile(filepath, "r");
            Thread[] t = new Thread [num];//array of threads
            ConcurrentMap<Integer, Integer> wordCountMap = new ConcurrentHashMap<Integer, Integer>(25000);
            byte [] byteArray = new byte [byr]; //allocates 500mb to a 2D byte array
            char[] newbyte;
            for (i = 0; i < file.length(); i += bytesToRead) {
                counter = 0;
                for (j = 0; j < bytesToRead; j += byr) {
                    file.seek(i + j);
                    file.read(byteArray, 0, byr);
                    newbyte = new String(byteArray).toCharArray();
                    t[counter] = new Thread(
                            new BigCountThread2(counter, 
                                newbyte, 
                                delim, 
                                wordCountMap));//giving each thread t[i] different file fileReader[i] 
                    t[counter].start();
                    counter++;
                    newbyte = null;
                }
                for (k = 0; k < num; k++){
                    t[k].join(); //main thread continues after ALL threads have finished. 
                }
                counter2++;
                System.gc();
            }
            file.close();
            System.exit(0);
        }
    }   

class BigCountThread2 implements Runnable {
    private final ConcurrentMap<Integer, Integer> wordCountMap;
    char [] newbyte;
    private ArrayList<Character> delim;
    private int threadId; //use for later
    BigCountThread2(int tid, 
            char[] newbyte, 
            ArrayList<Character> delim,
            ConcurrentMap<Integer, Integer> wordCountMap) { 
        this.delim = delim;
        threadId = tid;
        this.wordCountMap = wordCountMap;
        this.newbyte = newbyte;
    }
    public void run() {
        int intCheck = 0;
        int counter = 0; int i = 0; Integer check;  int j =0; int temp = 0; int intbuilder = 0;
        for (i = 0; i < newbyte.length; i++) {
            intCheck = Character.getNumericValue(newbyte[i]);
            if (newbyte[i] == ' ' || intCheck == -1) {    //once a delimiter is found, the current tempArray needs to be added to the MAP
                check = wordCountMap.putIfAbsent(intbuilder, 1);
                if (check != null) { //if returns null, then it is the first instance
                    wordCountMap.put(intbuilder, wordCountMap.get(intbuilder) + 1);
                }

                intbuilder = 0;
            }

            else {
                intbuilder = (intbuilder * 10) + intCheck;
                counter++;
            }

        }
    }
}
4

1 回答 1

2

关于一点点的一些想法..

..我相信某种工作池或执行器服务可以解决这个问题(我还没有学习过这个的语法)。

如果所有线程花费大约相同的时间来处理相同数量的数据,那么这里真的没有那么多“问题”。

然而,线程池的一个好处是它允许人们相当简单地调整一些基本参数,例如并发工作人员的数量。此外,使用executor service和 Futures 可以提供额外的抽象级别;在这种情况下,如果每个线程都返回一个映射作为结果,它可能会特别方便。

该程序仅适用于包含整数的文件。那是个问题。我为此苦苦挣扎,因为我不知道如何在不创建大量未使用变量的情况下迭代数据(使用 String 甚至 StringBuilder 的性能都很糟糕)..

这听起来像是一个实施问题。虽然我会首先尝试StreamTokenizer(因为它已经编写好了),但如果手动执行,我会检查源代码——在简化“令牌”的概念时可以省略其中的一部分。(它使用一个临时数组来构建令牌。)

我正在使用全局 ConcurrentHashMap 来记录键值对。.. 那么这是正确的设计吗?我会通过给每个线程自己的哈希图,然后在最后合并它们来提高性能吗?

使用每个线程和合并策略单独的映射将减少锁定并可能提高性能。此外,当前的实现被破坏了,因为wordCountMap.put(intbuilder, wordCountMap.get(intbuilder) + 1)不是原子的,因此操作可能被低估了。我会使用单独的映射,因为减少可变共享状态会使线程程序更容易推理。

有没有其他方法可以在不使用 RandomAccessMemory 类的情况下通过偏移量查找文件?这个类只会读入一个字节数组,然后我必须对其进行转换。我没有为这种转换计时,但也许使用其他东西可能会更快。

考虑对同一文件的每个线程使用 FileReader(和 BufferedReader) 。这将避免必须首先将文件复制到数组中并将其分割为单个线程,虽然总读取量相同,但避免了占用如此多的内存。完成的读取实际上不是随机访问,而只是从不同的偏移量开始的顺序(带有“跳过”) - 每个线程仍然在互斥范围内工作。

此外,如果一个整数值被“切割”成两半,那么带有切片的原始代码就会被破坏,因为每个线程都会读取一半的单词。一个解决方法是让每个线程跳过第一个单词,如果它是前一个块的延续(即更快地扫描一个字节),然后根据需要读取其范围的末尾以完成最后一个单词。

于 2014-06-10T05:48:37.423 回答