3

我有一个超过 700K + 行的巨大 CSV 文件。我必须解析该 CSV 文件的行并执行操作。我想通过使用线程来做到这一点。我一开始尝试做的很简单。每个线程都应处理 CSV 文件的唯一行。我的行数有限,只能读取 3000 行。我创建了三个线程。每个线程都应该读取 CSV 文件的一行。以下是代码:

import java.io.*;

class CSVOps implements Runnable
{
    static int lineCount = 1;
    static int limit = 3000;
    BufferedReader CSVBufferedReader;

    public CSVOps(){} // Default constructor

    public CSVOps(BufferedReader br){
        this.CSVBufferedReader = br;
    }

    private synchronized void readCSV(){
        System.out.println("Current thread "+Thread.currentThread().getName());
        String line;
        try {
            while((line = CSVBufferedReader.readLine()) != null){
                System.out.println(line);
                lineCount ++;
                if(lineCount >= limit){
                    break;
                }
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run() {
        readCSV();
    }

}

class CSVResourceHandler
{
    String CSVPath;

    public CSVResourceHandler(){ }// default constructor

    public CSVResourceHandler(String path){
        File f = new File(path);
        if(f.exists()){
            CSVPath = path;
        }
        else{
            System.out.println("Wrong file path! You gave: "+path);
        }
    }

    public BufferedReader getCSVFileHandler(){
        BufferedReader br = null;
        try{
            FileReader is = new FileReader(CSVPath);
            br = new BufferedReader(is);
        }
        catch(Exception e){
        }
        return br;
    }
}

public class invalidRefererCheck
{
    public static void main(String [] args) throws InterruptedException
    {
        String pathToCSV = "/home/shantanu/DEV_DOCS/Contextual_Work/invalid_domain_kw_site_wise_click_rev2.csv";
        CSVResourceHandler csvResHandler = new CSVResourceHandler(pathToCSV);
        CSVOps ops = new CSVOps(csvResHandler.getCSVFileHandler());

        Thread t1 = new Thread(ops);
        t1.setName("T1");

        Thread t2 = new Thread(ops);
        t1.setName("T2");

        Thread t3 = new Thread(ops);
        t1.setName("T3");

        t1.start();
        t2.start();
        t3.start();
    }
}

CSVResourceHandler 类简单查找传递的文件是否存在,然后创建一个 BufferedReader 并提供它。这个阅读器被传递给CSVOps 类。它有一个方法 readCSV,它读取 CSV 文件的单行并打印出来。限制设置为 3000。

现在为了让线程不搞乱计数,我将这些限制和计数变量都声明为静态的。当我运行这个程序时,我得到了奇怪的输出。我只得到大约 1000 条记录,有时我得到 1500 条。它们是随机排列的。在输出结束时,我得到两行 CSV 文件,当前线程名称是 main !!

我是线程的新手。我希望阅读这个 CSV 文件变得更快。可以做什么?

4

2 回答 2

2

好的,首先,不要使用多个线程从单个机械磁盘进行并行 I/O。它实际上会降低性能,因为每次线程有机会运行时,机械头都需要寻找下一个读取位置。因此,您不必要地弹跳磁盘的磁头,这是一项昂贵的操作。

使用单个生产者多消费者模型使用单个线程读取行并使用工作池处理它们。

关于你的问题:

你真的不应该在退出主线程之前等待线程完成吗?

public class invalidRefererCheck
{
    public static void main(String [] args) throws InterruptedException
    {
        ...
        t1.start();
        t2.start();
        t3.start();

        t1.join();
        t2.join();
        t3.join();
    }
}
于 2012-06-26T13:14:17.587 回答
0

我建议大块阅读文件。分配一个大缓冲区对象,读取一个块,从末尾解析以找到最后一个 EOL 字符,将缓冲区的最后一位复制到临时字符串中,在 EOL+1 处将 null 推入缓冲区,从缓冲区排队引用,立即创建一个新的,首先复制临时字符串,然后填充缓冲区的其余部分并重复直到 EOF。重复直到完成。使用线程池来解析/处理缓冲区。

您必须将整块有效行排队。排队单行将导致线程通信花费的时间比解析时间长。

请注意,这和类似的情况可能会导致池中的线程“无序”处理块。如果必须保留顺序(例如,输入文件已排序,而输出将进入另一个必须保持排序的文件),则可以让块组装器线程在每个块对象中插入一个序列号。然后,池线程可以将处理过的缓冲区传递给另一个线程(或任务),该线程(或任务)保留一个乱序块列表,直到所有先前的块都进入。

多线程不一定是困难/危险/无效。如果您使用队列/池/任务,请避免同步/加入,不要不断地创建/终止/销毁线程,并且只在一次只有一个线程可以处理的大型缓冲区对象周围排队。你应该看到一个很好的加速,几乎没有死锁、错误共享等的可能性。

这种加速的下一步将是预先分配一个缓冲区池队列,以消除缓冲区和相关 GC 的持续创建/删除,并在每个缓冲区的开头使用(L1 缓存大小)“死区”完全消除缓存共享。这在多核机器上会很快(尤其是使用 SSD!)。

哦,Java,对。我为我用空终止符回答的“CplusPlus-iness”道歉。不过,其余的点都还可以。这应该是一个与语言无关的答案:)

于 2012-06-27T08:33:10.527 回答