1

我有一个 ConcurrentMap,它在我的可运行文件之外被实例化,但在可运行文件内/跨可运行文件共享和更新。我的 runnables 需要是并发的,但我的 concurrentMap 更新需要同步以防止替换以前的条目。有人可以告诉我我做错了什么。

public class ExecutionSubmitExample {

    public static void main(String[] args) {
        //Ten concurrent threads
        ExecutorService es = Executors.newFixedThreadPool(10);

        List<Future<Example>> tasks = new ArrayList<>();

        ConcurrentHashMap<Integer, String> concurrentMap = new ConcurrentHashMap<>();

        for (int x = 0; x < 10; x++) {
            Example example = new Example(concurrentMap, x);
            Future<Example> future = es.submit(example, example);
            tasks.add(future);
        }

        try {
            for (Future<Example> future : tasks) {
                Example e = future.get();
            }

            for (Entry<Integer,String> obj : concurrentMap.entrySet()) {
                System.out.println("key " + obj.getKey() + " " + obj.getValue());
            }
            es.shutdown();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException ie) {
            throw new RuntimeException(ie);
        }
    }
}

可运行

public class Example implements Runnable {

    ConcurrentHashMap<Integer, String> concurrentMap;
    private int thread;

    public Example(ConcurrentHashMap<Integer, String> concurrentMap, int thread) {
        this.concurrentMap = concurrentMap;
        this.thread = thread;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            runAnalysis(i);
        }
    }

    public synchronized void runAnalysis(int index) {
        if(concurrentMap.containsKey(index)) {
            System.out.println("contains integer " + index);
        } else {
            System.out.println("put " + index + " thread " + thread);
            concurrentMap.put(index, "thread " + thread);
        }
    }
}

结果- 通知索引 0 被添加多次而不是一次。它应该由线程 0 添加并读取为线程 9 包含的内容。我不知何故需要将此方法与其他线程锁定,直到更新完成。

put 0 thread 0
put 0 thread 9
put 0 thread 6
put 0 thread 7
put 1 thread 7
put 0 thread 2
put 0 thread 1
put 0 thread 5
put 0 thread 3
put 0 thread 4
contains integer 1
contains integer 1
contains integer 1
contains integer 1
put 2 thread 7
put 1 thread 6
put 1 thread 9
put 1 thread 0
put 0 thread 8
contains integer 2
contains integer 2
contains integer 2
put 2 thread 2
put 2 thread 1
put 2 thread 5
put 2 thread 3
contains integer 1
contains integer 1
contains integer 2
contains integer 2
key 0 thread 8
key 2 thread 3
key 1 thread 0
4

4 回答 4

3

synchronized在方法上意味着synchronizedthis对象上。由于您每次都在创建新对象

Example example = new Example(concurrentMap, x);

同步发生在不同的对象上,所以没有任何阻塞。

您需要synchronized在共享对象上或使用共享的Lock. 这些可以传递给Example对象,或者您可以使用Luiggi 建议static的字段。在这种情况下,请注意该字段未在其他任何地方同步,否则可能会干扰此执行。

于 2013-11-04T17:17:21.803 回答
2

解决方案基于 Sotirios Delimanolis、Luiggi Mendoza、Sotirios Delimanolis 的答案。

主要的

public class ExecutionSubmitExample {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(10);

        List<Future<Example>> tasks = new ArrayList<>();

        ConcurrentHashMap<Integer, String> concurrentMap = new ConcurrentHashMap<>();

        for (int x = 0; x < 10; x++) {
            Example e = new Example(concurrentMap, x);
            Future<Example> future = es.submit(e, e);
            tasks.add(future);
        }
        // -- all threads should be launching, let's get the ExecutionSubmitExample objects



        try {
            for (Future<Example> future : tasks) {
                Example e = future.get();
            }

            for (Entry<Integer,String> obj : concurrentMap.entrySet()) {
                System.out.println("key " + obj.getKey() + " " + obj.getValue());
            }
            es.shutdown();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException ie) {
            throw new RuntimeException(ie);
        }
    }
}

可运行

public class Example implements Runnable {

    ConcurrentHashMap<Integer, String> concurrentMap;
    private int thread;
    private final Object lock = new Object();

    public Example(ConcurrentHashMap<Integer, String> concurrentMap, int thread) {
        this.concurrentMap = concurrentMap;
        this.thread = thread;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            runAnalysis(i);
        }
    }

    public void runAnalysis(int index) {
        synchronized(lock) {
            if(concurrentMap.containsKey(index)) {
                System.out.println("contains integer " + index);
            } else {
                System.out.println("put " + index + " thread " + thread);
                concurrentMap.put(index, "thread " + thread);
            }
        }
    }
}

结果

put 0 thread 0
contains integer 0
put 1 thread 7
put 2 thread 7
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
key 0 thread 0
key 2 thread 7
key 1 thread 7
于 2013-11-04T17:31:48.097 回答
1

使用 ConcurrentMap 时应避免同步。它提供了其他方法来处理这些类型的操作。对于这种情况,putIfAbsent 方法应该优先于 contains 和 put。

public void runAnalysis(int index) {
    if (concurrentMap.putIfAbsent(index, "thread " + thread) == null) {
        System.out.println("put " + index + " thread " + thread);
    } else {
        System.out.println("contains integer " + index);
    }
}
于 2013-11-04T18:12:25.633 回答
0

如果您创建一个新类“RunAnalysis”并在该类中添加方法 runAnalysis() 的代码,如下所示:

class RunAnalysis {
    public synchronized void analyse(ConcurrentHashMap<Integer, String> concurrentMap, int thread, int index) {
        if(concurrentMap.containsKey(index)) {
            System.out.println("contains integer " + index);
        } else {
            System.out.println("put " + index + " thread " + thread);
            concurrentMap.put(index, "thread " + thread);
        }
    }
}

public class SyncExample implements Runnable {
    RunAnalysis runAnalysis = new RunAnalysis();
    ConcurrentHashMap<Integer, String> concurrentMap;
    private int thread;

    public SyncExample(ConcurrentHashMap<Integer, String> concurrentMap, int     thread) {
        this.concurrentMap = concurrentMap;
        this.thread = thread;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            runAnalysis.analyse(concurrentMap, thread, i);
        }
    }
}

那么输出是:

put 0 thread 1
put 1 thread 1
put 2 thread 1
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 0
contains integer 1
contains integer 2
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
contains integer 0
contains integer 1
contains integer 2
key 0 thread 1
key 1 thread 1
key 2 thread 1
于 2016-10-08T08:05:56.060 回答