7

我有以下课程。我使用 ConcurrentHashMap。我有许多线程写入地图和一个计时器,每 5 分钟将数据保存在地图中。当我在映射中写入条目时,我设法通过使用 putIfAbsent() 来实现线程安全。但是,当我从中读取然后通过 clear() 方法删除所有条目时,我希望在读取映射内容然后删除它们的过程中没有其他线程写入映射。显然,即使使用了同步(锁定){},我的代码也不是线程安全的,b/c 在 saveEntries() 中拥有锁的线程不一定是在 log() 方法中写入我的地图的同一线程!除非我用同一个锁对象锁定 log() 中的整个代码!

我想知道是否有任何其他方法可以实现线程安全而无需通过外部锁强制同步?任何帮助是极大的赞赏。

public class Logging {

private static Logging instance;    
private static final String vendor1 = "vendor1";
private static final String vendor2 = "vendor2";    
private static long delay = 5 * 60 * 1000;

private ConcurrentMap<String, Event> vendor1Calls = new ConcurrentHashMap<String, Event>();
private ConcurrentMap<String, Event> vendor2Calls = new ConcurrentHashMap<String, Event>();

private Timer timer;    
private final Object lock = new Object();

private Logging(){
    timer = new Timer();                
    timer.schedule(new TimerTask() {
        public void run() {
            try {
                saveEntries();
            } catch (Throwable t) {
                timer.cancel();
                timer.purge();
            }
        }       
    }, 0, delay);
}

public static synchronized Logging getInstance(){     
    if (instance == null){
        instance = new Logging();
    }
    return instance;
 }

public void log(){      
    ConcurrentMap<String, Event> map;
    String key = "";        

    if (vendor1.equalsIgnoreCase(engine)){
        map = vendor1Calls;
    }else if(vendor2.equalsIgnoreCase(engine)){  
        map = vendor2Calls;
    }else{
        return;
    }       


    key = service + "." + method;
// It would be the code if I use a regular HashMap instead of ConcurrentHashMap
    /*Event event = map.get(key);       

    // Map does not contain this service.method, create an Event for the first     time.
    if(event == null){
        event = new Event();            
        map.put(key, event);

        // Map already contains this key, just adjust the numbers.
    }else{
        // Modify the object fields
    }*/
    //}

    // Make it thread-safe using CHM
    Event newEvent = new Event();
    Event existingEvent= map.putIfAbsent(key, newEvent); 

    if(existingEvent!=null && existingEvent!=newEvent){
        // Modify the object fields
}       

private void saveEntries(){

    Map<String, List<Event>> engineCalls = null;
    try {           

        engineCalls = new HashMap<String, List<Event>>();
        List<Event> events = null;

// How can I achieve therad safety here w/o applying any lock?
        //synchronized(lock){
            if(!vendor1Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor1Calls.values());
                engineCalls.put(vendor1, events);
                vendor1Calls.clear();
            }
            if(!vendor2Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor2Calls.values());
                engineCalls.put(vendor2, events);
                vendor2Calls.clear();
            }
        //}

// logICalls() saves the events in the DB.          
        DBHandle.logCalls(engineCalls);
    } catch (Throwable t) {         
    } finally {
        if(engineCalls!=null){
            engineCalls.clear();
        }                       
    }   
}       

}

4

5 回答 5

3

但是,当我从中读取然后通过 clear() 方法删除所有条目时,我希望在读取映射内容然后删除它们的过程中没有其他线程写入映射。

我认为您想说的是您并不真正关心严格锁定地图。相反,您只真正关心 vender1Calls.values() 和 vendor1Calls.clear() 之间的任何日志条目的丢失,对吗?

在这种情况下,我可以想象你可以更换

events.addAll(vendor1Calls.values());
vendor1Calls.clear();

在 saveEntries 中有这个:

for (Iterator<Event> iter = vendor1Calls.values().iterator(); iter.hasNext(); ) {
    Event e = iter.next();
    events.add(e);
    iter.remove();
}

这样,您只需删除已添加到事件列表中的事件。当 saveEntries() 仍在执行时,您仍然可以写入 vendor1Calls 映射,但迭代器会跳过添加的值。

于 2012-08-24T20:04:09.420 回答
3

如果没有任何外部同步,您无法使用 CHM 实现此目的。返回的 Iterator 视图是弱一致的,这意味着 Map 的内容可以在您实际迭代它时发生变化。

看来您需要使用 aCollections.synchronizedMap来获得您正在寻找的功能。

编辑以使我的观点更清楚:

要实现这一点,synchronizedMap您首先必须synchronize在地图上,然后您可以将内容迭代或复制到另一个地图中,然后再清除。

Map map = Collections.synchronizedMap(new HashMap());

public void work(){
  Map local = new HashMap();
  synchronized(map){
     local.putAll(map);
     map.clear();
  }
  //do work on local instance 
}

local正如我提到的那样,您可以iterate + remove类似于@Kevin Jin 的回答,而不是实例。

于 2012-08-24T21:04:42.760 回答
0

此线程中显示了此示例的原子版本(仅使用 ConcurrentMap 中的功能)。

于 2012-08-25T12:35:47.007 回答
0

我最好的建议是使用ReadWriteLock但是当您明确声明您不想使用任何锁(顺便说一句ConcurrentHashMap可能会在内部使用它们)时,您可以尝试以下操作。

对每个地图使用AtomicReference,当需要记录其内容时,使用getAndSet将旧地图替换为全新的空地图。

您现在拥有一个专属使用的地图,您可以随心所欲地遍历和清除它。不幸的是,有一个小问题(使用锁将摆脱),那就是如果另一个线程正在添加到映射的过程中,而您将它换成一个空的。您也许可以在这一点上添加一个延迟,希望等待足够长的时间让另一个线程完成它正在做的事情。也许您可以使用 a 的一些内部功能ConcurrentHashMap来等到每个人都完成它。

于 2012-08-25T22:42:48.947 回答
0

以下代码使用功能性 java项目中的持久映射。它使用更多内存,但(AFAIK :) 可以安全地被多个线程使用。中唯一的可变值,它使用compare-and-set 进行更新。地图和事件是不可变的,因此是线程安全的。另外,我没有清除地图,而是替换了对它的引用。AtomicReference

import fj.F;
import fj.Ord;
import fj.data.TreeMap;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

public class Logging
{
    // Event is immutable
    private static class Event
    {
        // updates are done by creating new values
        Event update(String key)
        {
            return new Event();
        }
    }

    // Refactored code pertaining to one vendor into a separate class.
    private static class EngineLogger
    {
        public final String vendor;
        private final AtomicReference<TreeMap<String, Event>> mapRef =
                new AtomicReference<TreeMap<String, Event>>(TreeMap.<String, Event>empty(Ord.stringOrd));

        private EngineLogger(String vendor)
        {
            this.vendor = vendor;
        }

        public void log(String service, String method)
        {
            final String key = service + "." + method;
            boolean updated = false;
            while (! updated)
            {
                // get the current value of the map
                TreeMap<String, Event> currMap = mapRef.get();

                // create an updated value of the map, which is the current map plus info about the new key
                TreeMap<String, Event> newMap = currMap.update(key, new F<Event, Event>()
                {
                    @Override
                    public Event f(Event event)
                    {
                        // Modify the object fields of event, if the map contains the key
                        return event.update(key);
                    }
                    // create a new event if the map does not contain the key
                }, new Event());

                // compare-and-set the new value in .. repeat until this succeeds
                updated = mapRef.compareAndSet(currMap, newMap);
            }
        }

        public List<Event> reset()
        {
            /* replace the reference with a new map */
            TreeMap<String, Event> oldMap = mapRef.getAndSet(TreeMap.<String, Event>empty(Ord.stringOrd));

            /* use the old map to generate the list */
            return new ArrayList<Event>(oldMap.toMutableMap().values());
        }
    }

    private static Logging instance;
    private static long delay = 5 * 60 * 1000;
    private final Timer timer;

    private final EngineLogger vendor1 = new EngineLogger("vendor1");
    private final EngineLogger vendor2 = new EngineLogger("vendor2");

    private Logging()
    {
        timer = new Timer();
        timer.schedule(new TimerTask()
        {
            public void run()
            {
                try
                {
                    saveEntries();
                }
                catch (Throwable t)
                {
                    timer.cancel();
                    timer.purge();
                }
            }
        }, 0, delay);
    }

    public static synchronized Logging getInstance()
    {
        if (instance == null)
        {
            instance = new Logging();
        }
        return instance;
    }

    public void log(String engine, String service, String method)
    {
        if (vendor1.vendor.equals(engine))
        {
            vendor1.log(service, method);
        }
        else if (vendor2.vendor.equals(engine))
        {
            vendor2.log(service, method);
        }
    }

    private void saveEntries()
    {
        Map<String, List<Event>> engineCalls = new HashMap<String, List<Event>>();
        engineCalls.put(vendor1.vendor, vendor1.reset());
        engineCalls.put(vendor2.vendor, vendor2.reset());
        DBHandle.logCalls(engineCalls);
    }
}
于 2012-08-28T09:30:47.107 回答