3

我在 Java 中的并发和映射方面遇到了一些麻烦。基本上我有多个线程使用(读取和修改)他们自己的地图,但是这些地图中的每一个都是一个更大的地图的一部分,正在被另一个线程读取和修改:

我的主要方法创建所有线程,线程创建它们各自的映射,然后将其放入“主”映射:

Map<String, MyObject> mainMap = new HashMap<String, Integer>();
FirstThread t1 = new FirstThread();
mainMap.putAll(t1.getMap());
t1.start();
SecondThread t2 = new SecondThread();
mainMap.putAll(t2.getMap());
t2.start();
ThirdThread t3 = new ThirdThread(mainMap);
t3.start();

我现在面临的问题是第三个(主)线程在映射中看到任意值,这取决于一个或两个其他线程何时更新“他们的”项目。然而,我必须保证第三个线程可以迭代 - 并使用 - 映射的值,而不必担心正在读取的部分内容是“旧的”:

FirstThread(类似于 SecondThread):

for (MyObject o : map.values()) {
    o.setNewValue(getNewValue());
}

第三线程:

for (MyObject o : map.values()) {
    doSomethingWith(o.getNewValue());
}

有任何想法吗?我考虑过使用全局可访问(通过静态类的静态最终对象)锁,当必须修改映射时,它将在每个线程中同步。或者是否有特定的 Map 实现可以评估我可以使用的这个特定问题?

提前致谢!

编辑: 正如@Pyranja 所建议的,可以同步 getNewValue() 方法。但是我忘了提到我实际上是在尝试按照事务的方式做一些事情,其中​​ t1 和 t2 在 t3 使用所述值之前/之后修改多个值。t3 的实现方式是 doSomethingWith() 如果它没有改变,它实际上不会对值做任何事情。

4

3 回答 3

3

要在比单个值对象更高的级别上进行同步,您需要锁来处理各个线程之间的同步。在不过多更改代码的情况下执行此操作的一种方法是ReadWriteLock。线程 1 和线程 2 是写入器,线程 3 是读取器。

您可以使用两把锁或一把锁来执行此操作。我在下面勾勒出用一个锁、两个写入线程和一个读取线程来完成它,而不用担心在数据更新期间会发生什么异常(即事务回滚......)。

话虽如此,这听起来像是一个经典的生产者-消费者场景。您应该考虑使用BlockingQueue 之类的东西在线程之间进行通信,如本问题所述。

您可能还需要考虑更改其他内容,例如使用 Runnable 而不是扩展 Thread

private static final class Value {

    public void update() {

    }

}

private static final class Key {

}

private final class MyReaderThread extends Thread {

    private final Map<Key, Value> allValues;

    public MyReaderThread(Map<Key, Value> allValues) {
        this.allValues = allValues;
    }

    @Override
    public void run() {
        while (!isInterrupted()) {
            readData();
        }
    }

    private void readData() {
        readLock.lock();
        try {
            for (Value value : allValues.values()) {
                // Do something
            }
        }
        finally {
            readLock.unlock();
        }

    }
}

private final class WriterThread extends Thread {

    private final Map<Key, Value> data = new HashMap<Key, Value>();

    @Override
    public void run() {
        while (!isInterrupted()) {
            writeData();
        }
    }

    private void writeData() {
        writeLock.lock();

        try {
            for (Value value : data.values()) {
                value.update();
            }
        }
        finally {
            writeLock.unlock();
        }
    }
}

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private final ReadLock readLock;
private final WriteLock writeLock;

public Thing() {
    readLock = lock.readLock();
    writeLock = lock.writeLock();
}

public void doStuff() {
    WriterThread thread1 = new WriterThread();
    WriterThread thread2 = new WriterThread();

    Map<Key, Value> allValues = new HashMap<Key, Value>();
    allValues.putAll(thread1.data);
    allValues.putAll(thread2.data);
    MyReaderThread thread3 = new MyReaderThread(allValues);

    thread1.start();
    thread2.start();
    thread3.start();
}
于 2013-01-30T22:49:15.617 回答
2

ConcurrentHashMapfrom java.util.concurrent- Map 的线程安全实现,它提供了比 synchronizedMap 高得多的并发度。只是很多读取几乎总是可以并行执行,同时读取和写入通常可以并行完成,多个同时记录通常可以并行完成。(该类ConcurrentReaderHashMap为多个读取操作提供了类似的并行性,但只允许一个活动的写入操作。)ConcurrentHashMap旨在优化检索操作。

于 2013-01-30T21:05:36.400 回答
1

您的示例代码可能具有误导性。在您的第一个示例中,您创建 aHashMap<String,Integer>但第二部分迭代地图值,在这种情况下为MyObject. 同步的关键是了解共享可变状态的位置和位置。

AnInteger是不可变的。它可以自由共享(但对 an 的引用Integer 可变的 - 它必须安全地发布和/或同步)。但是您的代码示例表明地图填充了可变MyObject实例。

鉴于MyObject任何线程都不会更改映射条目(键 -> 引用),并且所有映射都是在任何线程启动之前创建并安全发布的,我认为足以同步MyObject. 例如:

public class MyObject {
   private Object value;

   synchronized Object getNewValue() {
      return value;
   }

   synchronized void setNewValue(final Object newValue) {
      this.value = newValue;
   }
}

如果我的假设不正确,请澄清您的问题/代码示例,并考虑@jacobm 的评论和@Alex 的回答。

于 2013-01-30T21:24:14.593 回答