6

我正在尝试使用 Java 读取一个非常大的文件。那个大文件会有这样的数据,这意味着每一行都有一个用户 ID。

149905320
1165665384
66969324
886633368
1145241312
286585320
1008665352

在那个大文件中将有大约 3000 万个用户 ID。现在我试图从那个大文件中一一读取所有用户 ID 一次。这意味着每个用户 ID 只能从该大文件中选择一次。例如,如果我有 3000 万个用户 ID,那么它应该使用多线程代码只打印一次 3000 万个用户 ID。

下面是我的代码,它是一个运行 10 个线程的多线程代码,但是使用下面的程序,我无法确保每个用户 ID 只被选择一次。

public class ReadingFile {


    public static void main(String[] args) {

        // create thread pool with given size
        ExecutorService service = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            service.submit(new FileTask());
        }
    }
}

class FileTask implements Runnable {

    @Override
    public void run() {

        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("D:/abc.txt"));
            String line;
            while ((line = br.readLine()) != null) {
                System.out.println(line);
                //do things with line
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                br.close();
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}

有人可以帮我吗?我做错了什么?最快的方法是什么?

4

3 回答 3

18

假设你没有做任何事情,比如跨多个磁盘条带化文件,你真的无法改进让一个线程顺序读取文件。使用一个线程,您执行一次查找,然后执行一次长顺序读取;使用多个线程,您将拥有导致多次寻道的线程,因为每个线程都获得了对磁盘头的控制。

编辑:这是一种在仍然使用串行 I/O 读取行的同时并行化行处理的方法。它使用BlockingQueue在线程之间进行通信;将FileTask行添加到队列中,然后CPUTask读取并处理它们。这是一个线程安全的数据结构,所以不需要添加任何同步。您put(E e)用于将字符串添加到队列中,因此如果队列已满(它可以容纳多达 200 个字符串,如 中的声明中所定义ReadingFile),则FileTask阻塞直到空间释放;同样,您take()用于从队列中删除项目,因此CPUTask将阻塞直到项目可用。

public class ReadingFile {
    public static void main(String[] args) {

        final int threadCount = 10;

        // BlockingQueue with a capacity of 200
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(200);

        // create thread pool with given size
        ExecutorService service = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < (threadCount - 1); i++) {
            service.submit(new CPUTask(queue));
        }

        // Wait til FileTask completes
        service.submit(new FileTask(queue)).get();

        service.shutdownNow();  // interrupt CPUTasks

        // Wait til CPUTasks terminate
        service.awaitTermination(365, TimeUnit.DAYS);

    }
}

class FileTask implements Runnable {

    private final BlockingQueue<String> queue;

    public FileTask(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("D:/abc.txt"));
            String line;
            while ((line = br.readLine()) != null) {
                // block if the queue is full
                queue.put(line);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                br.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

class CPUTask implements Runnable {

    private final BlockingQueue<String> queue;

    public CPUTask(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String line;
        while(true) {
            try {
                // block if the queue is empty
                line = queue.take(); 
                // do things with line
            } catch (InterruptedException ex) {
                break; // FileTask has completed
            }
        }
        // poll() returns null if the queue is empty
        while((line = queue.poll()) != null) {
            // do things with line;
        }
    }
}
于 2013-06-20T18:22:15.633 回答
0

我们谈论的是一个平均 315 MB 的文件,其中行由新行分隔。我认为这很容易融入记忆。这暗示了用户名中没有必须保留的特定顺序。所以我会推荐以下算法:

  • 获取文件长度
  • 将文件的每十分之一复制到一个字节缓冲区中(二进制复制应该很快)
  • 启动一个线程来处理每个缓冲区
  • 每个线程处理他所在区域中的所有行,除了第一行和最后一行。
  • 完成后,每个线程必须返回其数据中的第一个和最后一个部分行,
  • 每个线程的“最后一个”必须与处理下一个文件块的“第一个”重新组合,因为您可能已经切断了一行。然后必须在之后处理这些令牌。
于 2014-07-15T13:47:22.913 回答
-2

Fork Join API introduced in 1.7 is a great fit for this use case. Check out http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html. If you search, you are going to find lots of examples out there.

于 2013-06-20T18:26:00.613 回答