4

这是我的 DataClientFactory 类。

public class DataClientFactory {
    public static IClient getInstance() {
        return ClientHolder.INSTANCE;
    }

    private static class ClientHolder {
        private static final DataClient INSTANCE = new DataClient();
        static {
            new DataScheduler().startScheduleTask();
        }
    }
}

这是我的 DataClient 类。

public class DataClient implements IClient {

    private ExecutorService service = Executors.newFixedThreadPool(15);
    private RestTemplate restTemplate = new RestTemplate();

    // for initialization purpose
    public DataClient() {
        try {
            new DataScheduler().callDataService();
        } catch (Exception ex) { // swallow the exception
            // log exception
        }
    }

    @Override
    public DataResponse getDataSync(DataKey dataKeys) {
        DataResponse response = null;
        try {
            Future<DataResponse> handle = getDataAsync(dataKeys);
            response = handle.get(dataKeys.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            // log error
            response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
        } catch (Exception e) {
            // log error
            response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
        }

        return response;
    }

    @Override
    public Future<DataResponse> getDataAsync(DataKey dataKeys) {
        Future<DataResponse> future = null;
        try {
            DataTask dataTask = new DataTask(dataKeys, restTemplate);
            future = service.submit(dataTask);
        } catch (Exception ex) {
            // log error
        }

        return future;
    }
}

我从上面的工厂获取我的客户端实例,如下所示,然后getDataSync通过传递DataKey对象来调用方法。DataKey 对象中包含userIdTimeout值。现在,在此之后,一旦被调用,就会调用我的DataTask类来调用方法。callhandle.get

IClient dataClient = DataClientFactory.getInstance();

long userid = 1234l;
long timeout_ms = 500;

DataKey keys = new DataKey.Builder().setUserId(userid).setTimeout(timeout_ms)
            .remoteFlag(false).secondaryFlag(true).build();

// call getDataSync method
DataResponse dataResponse = dataClient.getDataSync(keys);
System.out.println(dataResponse);

这是我的 DataTask 类,它具有所有逻辑 -

public class DataTask implements Callable<DataResponse> {

    private DataKey dataKeys;
    private RestTemplate restTemplate;

    public DataTask(DataKey dataKeys, RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
        this.dataKeys = dataKeys;
    }

    @Override
    public DataResponse call() {

        DataResponse dataResponse = null;
        ResponseEntity<String> response = null;

        int serialId = getSerialIdFromUserId();

        boolean remoteFlag = dataKeys.isRemoteFlag();
        boolean secondaryFlag = dataKeys.isSecondaryFlag();

        List<String> hostnames = new LinkedList<String>();

        Mappings mappings = ClientData.getMappings(dataKeys.whichFlow());

        String localPrimaryAdress = null;
        String remotePrimaryAdress = null;
        String localSecondaryAdress = null;
        String remoteSecondaryAdress = null;

        // use mappings object to get above Address by using serialId and basis on 
        // remoteFlag and secondaryFlag populate the hostnames linked list

        if (remoteFlag && secondaryFlag) {
            hostnames.add(localPrimaryHostIPAdress);
            hostnames.add(localSecondaryHostIPAdress);
            hostnames.add(remotePrimaryHostIPAdress);
            hostnames.add(remoteSecondaryHostIPAdress);
        } else if (remoteFlag && !secondaryFlag) {
            hostnames.add(localPrimaryHostIPAdress);
            hostnames.add(remotePrimaryHostIPAdress);
        } else if (!remoteFlag && !secondaryFlag) {
            hostnames.add(localPrimaryHostIPAdress);
        } else if (!remoteFlag && secondaryFlag) {
            hostnames.add(localPrimaryHostIPAdress);
            hostnames.add(localSecondaryHostIPAdress);
        }

        for (String hostname : hostnames) {
            // If host name is null or host name is in local block host list, skip sending request to this host
            if (hostname == null || ClientData.isHostBlocked(hostname)) {
                continue;
            }

            try {
                String url = generateURL(hostname);
                response = restTemplate.exchange(url, HttpMethod.GET, dataKeys.getEntity(), String.class);

                // make DataResponse

                break;

            } catch (HttpClientErrorException ex) {
                // make DataResponse
                return dataResponse;
            } catch (HttpServerErrorException ex) {
                // make DataResponse
                return dataResponse;
            } catch (RestClientException ex) {
                // If it comes here, then it means some of the servers are down.
                // Add this server to block host list 
                ClientData.blockHost(hostname);
                // log an error

            } catch (Exception ex) {
                // If it comes here, then it means some weird things has happened.
                // log an error
                // make DataResponse
            }
        }

        return dataResponse;
    }

    private String generateURL(final String hostIPAdress) {
        // make an url
    }


    private int getSerialIdFromUserId() {
        // get the id
    }
}

现在基于userId,我将获取serialId然后获取主机名列表,我想根据传递的标志进行调用。然后我迭代hostnames列表并调用服务器。假设我在链表中​​有四个主机名(A、B、C、D),那么我将首先调用 A,如果我取回数据,则返回 DataResponse。但是假设如果 A 已关闭,那么我需要立即将 A 添加到阻止列表,以便没有其他线程可以调用 A 主机名。然后调用主机名 B 并取回数据并返回响应(如果 B 也关闭,则重复相同的操作)。

我还有一个后台线程,它每 10 分钟运行一次,它会在我们从工厂获取客户端实例后立即启动,它会解析我的另一个服务 URL 以获取我们不应该调用的块主机名列表。由于它每 10 分钟运行一次,因此任何关闭的服务器,它只会在 10 分钟后获取列表,通常假设如果 A 关闭,那么我的服务将提供 A 作为主机名的阻止列表,并且一旦 A 启动,那么该列表也会在 10 分钟后更新。

这是我的后台线程代码DataScheduler-

public class DataScheduler {

    private RestTemplate restTemplate = new RestTemplate();
    private static final Gson gson = new Gson();

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public void startScheduleTask() {
        scheduler.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    callDataService();
                } catch (Exception ex) {
                    // log an error
                }
            }
        }, 0, 10L, TimeUnit.MINUTES);
    }

    public void callDataService() throws Exception {
        String url = null;

        // execute the url and get the responseMap from it as a string

        parseResponse(responseMap);
    }


    private void parseResponse(Map<FlowsEnum, String> responses) throws Exception {

        // .. some code here to calculate partitionMappings

        // block list of hostnames 
        Map<String, List<String>> coloExceptionList = gson.fromJson(response.split("blocklist=")[1], Map.class);
        for (Map.Entry<String, List<String>> entry : coloExceptionList.entrySet()) {
            for (String hosts : entry.getValue()) {
                blockList.add(hosts);
            }
        }

        if (update) {
            ClientData.setAllMappings(partitionMappings);
        }

        // update the block list of hostnames
        if (!DataUtils.isEmpty(responses)) {
            ClientData.replaceBlockedHosts(blockList);
        }
    }
}

这是我的 ClientData 类,它包含主机名阻止列表和 partitionMappings 详细信息(用于获取有效主机名列表)的所有信息。

public class ClientData {

    private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = new AtomicReference<ConcurrentHashMap<String, String>>(
            new ConcurrentHashMap<String, String>());


    // some code here to set the partitionMappings by using CountDownLatch 
    // so that read is blocked for first time reads

    public static boolean isHostBlocked(String hostName) {
        return blockedHosts.get().contains(hostName);
    }

    public static void blockHost(String hostName) {
        blockedHosts.get().put(hostName, hostName);
    }

    public static void replaceBlockedHosts(List<String> blockList) {
        ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
        for (String hostName : blockList) {
            newBlockedHosts.put(hostName, hostName);
        }
        blockedHosts.set(newBlockedHosts);
    }
}

问题陈述:-

当所有服务器都启动时(以 A、B、C、D 为例),上面的代码工作正常,我看不到任何TimeoutException事情发生,handle.get但是如果假设一台服务器(A)出现故障,我应该这样做从主线程进行调用,然后我开始看到很多TimeoutException,我的意思是,发生了大量的客户端超时。

我不确定为什么会这样?一般来说,这不会发生,因为一旦服务器关闭,它就会被添加到 blockList 中,然后没有线程会调用该服务器,而是会尝试列表中的另一个服务器?所以这应该是一个顺利的过程,然后一旦这些服务器启动,blockList 将从后台线程更新,然后您就可以开始拨打电话了。

我上面的代码有什么问题会导致这个问题吗?任何建议都会有很大帮助。

一般来说,我想做的是 - 根据使用映射对象传递的用户 ID 创建主机名列表。然后调用第一个主机名并获取响应。但是,如果该主机名已关闭,则添加到阻止列表并调用列表中的第二个主机名。

这是我看到的 Stacktrace -

java.util.concurrent.TimeoutException\n\tat java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258)
java.util.concurrent.FutureTask.get(FutureTask.java:119)\n\tat com.host.client.DataClient.getDataSync(DataClient.java:20)\n\tat 

注意:对于多个 userId,我们可以拥有相同的服务器,这意味着服务器 A 可以解析到多个 userId。

4

2 回答 2

0

您问了一些建议,所以这里有一些建议:

1.)意外的返回值
方法意外返回FALSE

if (ClientData.isHostBlocked(hostname)) //this may return always false! please check

2.)异常处理
真的确定会发生RestClientException吗?
只有当这个异常发生时,主机才会被添加到阻止列表中!
您发布的代码似乎忽略了日志记录(已被注释掉!)

        ...catch (HttpClientErrorException ex) {
            // make DataResponse
            return dataResponse;
        } catch (HttpServerErrorException ex) {
            // make DataResponse
            return dataResponse;
        } catch (RestClientException ex) {
            // If it comes here, then it means some of the servers are down.
            // Add this server to block host list 
            ClientData.blockHost(hostname);
            // log an error

        } catch (Exception ex) {
            // If it comes here, then it means some weird things has happened.
            // log an error
            // make DataResponse
        }
于 2014-09-06T20:25:23.407 回答
0

在 DataClient 类中,在以下行:

public class DataClient implements IClient {

----code code---

        Future<DataResponse> handle = getDataAsync(dataKeys);

//BELOW LINE IS PROBLEM

        response = handle.get(dataKeys.getTimeout(), TimeUnit.MILLISECONDS); // <--- HERE
    } catch (TimeoutException e) {
        // log error
        response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
    } catch (Exception e) {
        // log error
        response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);

----code code-----

您已为 handle.get(...) 分配了一个超时,这在您的 REST 连接可以响应之前超时。其余连接本身可能会超时,也可能不会超时,但是由于您在线程执行完成之前就超时了future的get方法,因此阻塞主机没有可见的效果,而调用方法中的代码DataTask 可能按预期执行。希望这可以帮助。

于 2014-09-04T07:17:06.893 回答