0

我正在尝试多线程导入作业,但遇到了导致重复数据的问题。我需要将我的地图保持在循环之外,以便我的所有线程都可以更新并从中读取,但是如果它没有最终版本,我就无法做到这一点,并且如果它是最终版本,我将无法更新地图。目前我需要将我的 Map 对象放在 run 方法中,但是当值最初不在数据库中并且每个线程创建一个新值时,问题就出现了。这会导致数据库中出现重复数据。有人知道如何进行某种回调以更新我在外面的地图吗?

ExecutorService executorService = Executors.newFixedThreadPool(10);

final Map<Integer, Object> map = new HashMap<>();
map.putAll(populate from database);
for (int i = 0; i < 10; i++) {

    executorService.execute(new Runnable() {
        public void run() {

        while ((line = br.readLine()) != null) {
            if(map.containsKey(123)) {
                //read map object
                session.update(object);                
            } else {
                map.put(123,someObject);
                session.save(object);
            }            

            if(rowCount % 250 == 0)
            tx.commit;
        });

}

executorService.shutdown();
4

4 回答 4

1

我会建议以下解决方案

  • 利用ConcurrentHashmap
  • 不要在你的爬行线程中使用updatecommit
  • 当您的地图在单独的线程中达到临界大小时save触发。commit

伪代码示例:

final Object lock = new Object();

...

executorService.execute(new Runnable() {
    public void run() {
        ...
        synchronized(lock){
            if(concurrentMap.size() > 250){
               saveInASeparateThread(concurrentMap.values().removeAll()));          
            }
        }
    }
}
于 2013-10-28T18:42:07.513 回答
1

您需要使用一些同步技术。

有问题的部分是当不同的线程试图将一些数据放入地图时。

例子:

线程 1 正在检查 map 中是否存在键为 123 的对象。在线程 1 将新对象添加到映射之前,线程 2 被执行。线程 2 还检查是否存在键为 123 的对象。然后两个线程都将对象 123 添加到映射中。这会导致重复...

您可以在此处阅读有关同步的更多信息

http://docs.oracle.com/javase/tutorial/essential/concurrency/sync.html

于 2013-10-28T17:57:01.057 回答
1

以下逻辑解决了我的问题。下面的代码未经测试。

ExecutorService executorService = Executors.newFixedThreadPool(10);

final Map<Integer, Object> map = new ConcurrentHashMap<>();
map.putAll(myObjectList);

List<Future> futures = new ArrayList<>();

for (int i = 0; i < 10; i++) {
    final thread = i;

    Future future = executorService.submit(new Callable() {
        public void call() {

        List<MyObject> list;

        CSVReader reader = new CSVReader(new InputStreamReader(csvFile.getStream()));

        list = bean.parse(strategy, reader);

        int listSize = list.size();
        int rowCount = 0;

        for(MyObject myObject : list) {

            rowCount++;

            Integer key = myObject.getId();

            if(map.putIfAbsent(key, myObject) == null) {
                session.save(object);                
            } else {
                myObject = map.get(key);
                //Do something
                session.update(myObject);
            }            

            if(rowCount % 250 == 0 || rowCount == listSize) {
                tx.flush();
                tx.clear();
            }
        };

        tx.commit();

        return "Thread " + thread + " completed."; 

    });  

    futures.add(future);  
}

for(Future future : futures) {
    System.out.println(future.get());
}

executorService.shutdown();
于 2013-11-13T19:46:02.717 回答
1

根据您的问题描述,您似乎希望拥有一个数据一致的地图,并且您始终拥有最新的最新数据而不会错过任何更新。

在这种情况下,让您映射为Collections.synchronizedMap(). 这将确保对映射的所有读取和写入更新都是同步的,因此您可以保证使用映射中的最新数据找到密钥,并保证专门写入映射。

有关地图使用的并发技术之间的区别,请参阅此SO 讨论。

此外,还有一件事——将地图定义为最终地图并不意味着你不能修改地图——你绝对可以在地图中添加和删除元素。但是,您不能做的是将变量更改为指向另一个地图。下面的简单代码片段说明了这一点:

    private final Map<Integer, String> testMap = Collections.synchronizedMap(new HashMap<Integer,String>());
    testMap.add(1,"Tom"); //OK
    testMap.remove(1);   //OK
    testMap = new HashMap<Integer,String>(); //ERROR!! Cannot modify a variable with the final modifier
于 2013-10-28T18:20:17.583 回答