0

我正在处理一个从 .csv 文件中读取项目并将它们写入远程数据库的程序。我正在尝试对程序进行多线程处理,为此我创建了 2 个具有不同连接的进程线程。为此,将 .csv 文件读入缓冲阅读器,并处理缓冲阅读器的内容。但是,线程似乎一直在复制数据(将每个元组的两个副本写入数据库)。

我一直在试图弄清楚如何在 Java 中对缓冲区进行互斥,而我能想到的最接近的事情是优先级队列。

我的问题是,您可以使用缓冲阅读器将文件逐行读入优先级队列吗?IE

public void readFile(Connection connection) {
        BufferedReader bufReader = null;
        try{
            bufReader = new BufferedReader(new FileReader(RECS_FILE));
            bufReader.readLine(); //skip header line
            String line;
            while((line = bufReader.readLine()) != null) {
                //extract fields from each line of the RECS_FILE
                Pattern pattern = Pattern.compile( "\"([^\"]+)\",\"([^\"]+)\",\"([^\"]+)\",\"([^\"]+)\"");
                Matcher matcher = pattern.matcher(line); 
                if(!matcher.matches()) {
                    System.err.println("Unexpected line in "+RECS_FILE+": \""+line+"\"");
                    continue;
                }
                String stockSymbol = matcher.group(1);
                String recDateStr = matcher.group(2);
                String direction = matcher.group(3);
                String completeUrl = matcher.group(4);

                //create recommendation object to populate required fields
                //  and insert it into the database
                System.out.println("Inserting to DB!");
                Recommendation rec = new Recommendation(stockSymbol, recDate, direction, completeUrl);
                rec.insertToDb(connection);
            }
        } catch (IOException e) {
            System.err.println("Unable to read "+RECS_FILE);
            e.printStackTrace();
        } finally {
            if(bufReader != null) {
                try{
                    bufReader.close();
                } catch (IOException e) {
                }
            }
        }

    }

您会看到缓冲阅读器用于读取 .csv 文件。有没有办法在函数外部设置优先级队列,以便缓冲读取器将元组放入优先级队列中,然后每个程序线程访问优先级队列?

4

2 回答 2

1

缓冲读取器,或者实际上任何读取器或流本质上仅用于单线程使用。优先级队列是一个完全独立的结构,取决于实际的实现,它可能被多个线程使用,也可能不被使用。所以简短的回答是:不,它们是两个完全不相关的概念。

为了解决您最初的问题:您不能使用多线程的流式文件访问。您可以RandomAccessFile在理论上使用,除了您的行不是固定宽度,因此您不能seek()在不读取文件中的所有内容的情况下到达行的开头。此外,即使您的数据由固定记录组成,使用两个不同线程读取文件也可能不切实际。

您唯一可以并行化的是数据库插入,但明显需要注意的是您失去了事务性,因为您必须为每个线程使用单独的事务。(如果你不这样做,你必须同步你的数据库操作,这再次意味着你没有赢得任何东西。)

因此,一种解决方案可以是从一个线程读取行并将字符串传递给通过ExecutorService. 这将很好地扩展,但同样有一个警告:数据库锁定的增加的开销可能会抵消使用多线程的优势。

最终的教训可能是不要使事情过于复杂:尝试简单的方法,如果简单的方法不起作用,则只寻找更复杂的解决方案。另一个教训可能是多线程对 I/O 密集型程序没有帮助。

于 2012-03-24T21:45:06.630 回答
0

@Biziclop 的答案在 (+1) 上,但我想我会添加一些关于批量数据库插入的内容。

如果您不知道,在大多数 SQL 数据库中关闭数据库自动提交是批量插入期间的一大胜利。通常在每个 SQL 语句之后,数据库将其提交到磁盘存储,该存储更新索引并对磁盘结构进行所有更改。通过关闭此自动提交,数据库只需在您commit最后调用时进行这些更改。通常,您会执行以下操作:

conn.setAutoCommit(false);
for (Recommendation rec : toBeInsertedList) {
    rec.insertToDb(connection);
}
conn.setAutoCommit(true);

此外,如果您的数据库不支持自动提交,则通常将插入包装在事务中会完成同样的事情。

以下是一些可能有帮助的其他答案:

于 2012-03-25T00:05:28.017 回答