2

我正在尝试实现锁定,我希望通过它来避免在我进行写入时发生读取。

我的要求是:

  • 读取阻塞,直到第一次设置所有两个映射。
  • 现在第二次,如果我正在更新地图,我仍然可以返回所有两个旧地图值(在所有两个地图上完成更新之前),或者它应该阻止并在更新完成时返回所有新的两个地图值在所有两张地图上。

因为我有两个 Maps - primaryMappingsecondaryMapping所以它应该返回两个更新地图的所有新值,或者它应该返回地图的所有旧值。基本上,在更新时我不想返回primaryMapping有旧值,secondaryMapping有新值。它应该是一致的,要么应该返回旧值,要么应该在更新地图后返回新值。就我而言,地图更新将在 7 或 8 个月内发生一次(很少)。我为此使用倒计时锁存器,到目前为止它工作正常,没有任何问题。

现在我有两个流程,如下所示: 对于每个流程,我都有一个 URL,我们可以从中获取上述两个地图的数据。一般来说,我将有两个流程的两个映射,因此与设备流程相比,我们将为 PROCESS 流程的这两个映射具有不同的值。

public enum FlowType {
    PROCESS, DEVICE;
}

我有一个后台线程每 5 分钟运行一次,它从每个 FLOW url 获取数据,并在有更新时填充这两个地图。当应用程序第一次启动时,它会更新映射,之后,它会在 7-8 个月后更新映射。这并不意味着两个流映射会同时改变。PROCESS 映射可能发生了变化,但 DEVICE 映射没有变化。

public class DataScheduler {

    private RestTemplate restTemplate = new RestTemplate();
    private static final String PROCESS_URL = "http://process_flow_url/";
    private static final String DEVICE_URL = "http://device_flow_url/";
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public void startScheduleTask() {
        scheduler.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    callService();
                } catch (Exception ex) {
                    // logging exception here using logger
                }
            }
        }, 0, 5, TimeUnit.MINUTES);
    }

    public void callService() throws Exception {
        String url = null;
        Map<FlowType, String> holder = new HashMap<FlowType, String>();
        for (FlowType flow : FlowType.values()) {
            try {
                url = getURL(flow);
                String response = restTemplate.getForObject(url, String.class);
                holder.put(flow, response);
            } catch (RestClientException ex) {
                // logging exception here using logger      
            }
        }
        parseResponse(holder);
    }

    private void parseResponse(Map<FlowType, String> responses) throws Exception {
        Map<FlowType, Mapping> partitionMapper = new HashMap<FlowType, Mapping>();
        boolean update = false;

        for (Map.Entry<FlowType, String> responseEntry : responses.entrySet()) {
            FlowType flow = responseEntry.getKey();
            String response = responseEntry.getValue();

            Map<String, Map<Integer, Integer>> primaryMapping = new HashMap<>();
            Map<String, Map<Integer, Integer>> secondaryMapping = new HashMap<>();

            if (!DataUtils.isEmpty(response)) {
                try (Scanner scanner = new Scanner(response)) {
                    boolean hasProcess = Boolean.parseBoolean(scanner.nextLine().trim().substring(HAS_PROCESS_LEN));
                    if (hasProcess) {
                        update = true;

                        // some code    
                        partitionMapper.put(flow, PartitionHolder.createMapping(
                                            primaryMapping, secondaryMapping));
                    }
                }
            }
        }
        // if there is any update, then only update the mappings, otherwise not.
        if (update) {
            PartitionHolder.setMappings(partitionMapper);
        }
    }
}

正如您在上面看到的,setMappings如果任何流映射已更新,它会通过调用方法来更新映射。

下面是我的 PartitionHolder 类:

public class PartitionHolder {

    public static class Mapping {
        public final Map<String, Map<Integer, Integer>> primaryMapping;
        public final Map<String, Map<Integer, Integer>> secondaryMapping;

        public Mapping(Map<String, Map<Integer, Integer>> primaryMapping,
                Map<String, Map<Integer, Integer>> secondaryMapping) {
            this.primaryMapping = primaryMapping;
            this.secondaryMapping = secondaryMapping;
        }

        // getters here
    }

    private static final AtomicReference<Map<FlowType, Mapping>> mappingsHolder = new AtomicReference<Map<FlowType, Mapping>>();
    private static final CountDownLatch hasInitialized = new CountDownLatch(1);

    public static Mapping getFlowMapping(FlowType flowType) {
        try {
            hasInitialized.await();
            return mappingsHolder.get().get(flowType);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
    }

    public static void setMappings(Map<FlowType, Mapping> newMapData) {
        mappingsHolder.set(newMapData);
        hasInitialized.countDown();
    }

    public static Mapping createMapping(
            Map<String, Map<Integer, Integer>> primaryMapping,
            Map<String, Map<Integer, Integer>> secondaryMapping) {
        return new Mapping(primaryMapping, secondaryMapping);
    }
}

现在这是我PartitionHolder在主线程中使用类的方式。在一次调用中,我们将只获得一个流的映射。在下面的代码dataKey.getFlowType()中可以是过程或设备。

@Override
public DataResponse call() throws Exception {
    Mapping mappings = PartitionHolder.getFlowMapping(dataKey.getFlowType());
    // use mappings object here
}

现在在我上面的代码中,如您所见,setMappings如果任何流映射已更新,我将通过调用方法来更新映射。有两种情况可能发生:

  • 当应用程序第一次启动时,它可以正常工作,没有任何问题,因为它会更新两个流的映射。
  • 现在第二次,假设对于 PROCESS 流,映射已更新,但对于 DEVICE 流映射尚未更新,那么在这种情况下,它将通过调用setMappings将覆盖的方法来更新整体映射mappingsHolder。这种方法的问题是,对于 DEVICE 流,我将在调用时开始获取空映射,getFlowMapping因为它会将 DEVICE 映射覆盖到空映射。正确的?

我怎样才能避免我的第二个问题?有没有更好的方法来解决这个问题,而不是使用 Map 和 key 作为 FlowType?我需要在这里使用工厂模式来解决这个问题还是其他更好的方法?

4

2 回答 2

1

ConcurrentHashMap是线程安全且相对快速的,因此可以简化问题:

private static final ConcurrentHashMap<FlowType, Mapping> mappingsHolder = 
        new ConcurrentHashMap<FlowType, Mapping>();

public static void setMappings(FlowType flowType, Mapping newMapData) {

    mappingsHolder.put(flowType, newMapData);
    hasInitialized.countDown();
}

public static Mapping getFlowMapping(FlowType flowType) {

    try {
        hasInitialized.await();
        return mappingsHolder.get(flowType);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new IllegalStateException(e);
    }
}

那应该解决第二个问题。

于 2015-06-28T22:49:11.283 回答
1

我同意 vanOekel 的观点,这ConcurrentHashMap将有助于解决问题 - 在您DataScheduler执行try (Scanner scanner = new Scanner(response))块内的地图更新时 - 这样,如果抛出异常,则地图将保持不变。

添加一个

public static boolean isInitialized() {
  return hasInitialized.getCount() == 0;
}

方法PartitionHolder,然后修改callService以在映射尚未初始化时执行重试(这将永远重试,或者您可以重试N次数,然后使用默认数据或中止程序或第三个选项)。

public void callService() throws Exception {
    boolean doRetry = true;
    while(doRetry) {
        String url = null;
        Map<FlowType, String> holder = new HashMap<FlowType, String>();
        for (FlowType flow : FlowType.values()) {
            try {
                url = getURL(flow);
                String response = restTemplate.getForObject(url, String.class);
                holder.put(flow, response);
            } catch (RestClientException ex) {
                // logging exception here using logger      
            }
        }
        try {
            parseResponse(holder);
            doRetry = false;
        } catch(Exception e) {
            doRetry = !PartitionHolder.isInitialized();
            if(PartitionHolder.isInitialized()) {
                throw e;
            }
        }
    }
}
于 2015-06-29T02:45:27.333 回答