1

您好我已经实现了一种从数百万个元素(整数)的数组中计算模式值的方法。我现在将顺序版本与使用执行器服务的(应该是)改进版本进行比较......不幸的是,性能不如预期:

Sequentiallly iterating hashMap (version 0)

#size   #time       #memory
10000000    13772ms     565mb
20000000    35355ms     1135mb
30000000    45879ms     1633mb

Assigning jobs to a Service Executor (version 2)
#size   #time       #memory
10000000    16186ms     573mb
20000000    34561ms     1147mb
30000000    54792ms     1719mb

Executor Service 的代码如下:

 /* Optimised-Threaded Method to calculate the Mode */
    private int getModeOptimisedThread(int[] mybigarray){
        System.out.println("calculating mode (optimised w/ ExecutorService)... ");

        int mode = -1;

        //create an hashmap to calculating the frequencies        
        TreeMap<Integer, Integer> treemap = new TreeMap<Integer, Integer>();

        //for each integer in the array, we put an entry into the hashmap with the 'array value' as a 'key' and frecuency as 'value'.
        for (int i : mybigarray) {
            //we check if that element already exists in the Hashmap, by getting the element with Key 'i'
            // if the element exists, we increment the frequency, otherwise we insert it with frecuency = 1;
            Integer frequency = treemap.get(i);
            int value = 0;
            if (frequency == null){ //element not found
                value = 1;
            }
            else{                   //element found
                value = frequency + 1;
            }

            //insert the element into the hashmap
            treemap.put(i, value);
        }



        //Look for the most frequent element in the Hashmap        
        int maxCount = 0;

        int n_threads = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(n_threads);


        //create a common variable to store maxCount and mode values
        Result r = new Result(mode, maxCount);

        //set the umber of jobs
        int num_jobs = 10;
        int job_size = treemap.size()/num_jobs;        

        System.out.println("Map size "+treemap.size());
        System.out.println("Job size "+job_size);

        //new MapWorker(map, 0, halfmapsize, r);
        int start_index, finish_index;

        List<Callable<Object>> todolist = new ArrayList<Callable<Object>>(num_jobs);

        //assign threads to pool

            for (int i=0; i<num_jobs; i++)
            {   
                    start_index=i*job_size;
                    finish_index = start_index+job_size;

                    System.out.println("start index: "+start_index+". Finish index: "+finish_index);
                    todolist.add(Executors.callable(new MapWorker(treemap.subMap(start_index, finish_index), r)));

            }        
       try{
           //invoke all will not return until all the tasks are completed
           es.invokeAll(todolist);
        } catch (Exception e) { 
            System.out.println("Error in the Service executor "+e);
        } finally {
           //finally the result
            mode = r.getMode(); 
        }

        //return the result
        return mode;
    }

关于 Executor Service 代码质量的任何建议?请建议,这是我第一次实现ES

编辑:

Worker 公共类 MapWorker 实现 Runnable{

    private int index;
    private int size;
    private int maxCount;
    private Result result;
    private  Map <Integer, Integer> map;

    //Constructor    
    MapWorker( Map <Integer, Integer> _map, Result _result){        
        this.maxCount = 0;
        this.result = _result;
        map = _map;
    }

    public void run(){               
        for (Map.Entry<Integer, Integer> element : map.entrySet()) {
            if (element.getValue() > result.getCount()) {                
                 result.setNewMode(element.getKey(),element.getValue());                 
            }
        }         
    }

}

和结果类:

public class Result {
    private int mode;
    private int maxCount;

    Result(int _mode, int _maxcount){
        mode = _mode;
        maxCount = _maxcount;
    }

    public synchronized void setNewMode(int _newmode, int _maxcount) {
        this.mode = _newmode;
        this.maxCount = _maxcount;
    }

    public int getMode() {
        return mode;
    }

    public synchronized int getCount() {
        return maxCount;
    }

}
4

2 回答 2

1
  1. 对于每个作业,使用单独的 Result 对象(不同步)。当所有作业完成后,选择具有最大值的结果。

  2. int num_jobs = n_threads;

于 2013-02-01T02:49:44.730 回答
1

计算频率时正在完成大部分工作。这将显着支配您通过尝试更新结果获得的并行性的任何好处。在最后更新全局频率之前,您需要在本地计算频率的每个工作人员对模式的计算进行并行化。您可以考虑使用AtomicInteger将模式存储在全局存储中以确保线程安全。计算完频率后,您可以在最后按顺序计算模式,因为按顺序遍历地图的计算成本要低得多。

像下面这样的东西应该会更好: 编辑:修改了 updateScore() 方法来修复数据竞争。

    私有静态类 ResultStore {

private Map<Integer, AtomicInteger> store = new ConcurrentHashMap<Integer, AtomicInteger>(); public int size() { return store.size(); } public int updateScore(int key, int freq) { AtomicInteger value = store.get(key); if (value == null) { store.putIfAbsent(key, new AtomicInteger(0)); value = store.get(key); } return value.addAndGet(freq); } public int getMode() { int mode = 0; int modeFreq = 0; for (Integer key : store.keySet()) { int value = store.get(key).intValue(); if (modeFreq < value) { modeFreq = value; mode = key; } } return mode; } } private static int computeMode(final int[] mybigarray) { int n_threads = Runtime.getRuntime().availableProcessors(); ExecutorService es = Executors.newFixedThreadPool(n_threads); final ResultStore rs = new ResultStore(); //set the number of jobs int num_jobs = 10; int job_size = mybigarray.length / num_jobs; System.out.println("Map size " + mybigarray.length); System.out.println("Job size " + job_size); List<Callable<Object>> todolist = new ArrayList<Callable<Object>>(num_jobs); for (int i = 0; i < num_jobs; i++) { final int start_index = i * job_size; final int finish_index = start_index + job_size; System.out.println("Start index: " + start_index + ". Finish index: " + finish_index); todolist.add(Executors.callable(new Runnable() { @Override public void run() { final Map<Integer, Integer> localStore = new HashMap<Integer, Integer>(); for (int i = start_index; i < finish_index; i++) { final Integer loopKey = mybigarray[i]; Integer loopValue = localStore.get(loopKey); if (loopValue == null) { localStore.put(loopKey, 1); } else { localStore.put(loopKey, loopValue + 1); } } for (Integer loopKey : localStore.keySet()) { final Integer loopValue = localStore.get(loopKey); rs.updateScore(loopKey, loopValue); } } })); } try { //invoke all will not return until all the tasks are completed es.invokeAll(todolist); } catch (Exception e) { System.out.println("Error in the Service executor " + e); } return rs.getMode(); }

于 2013-02-01T22:14:38.747 回答