这是我的 DataClientFactory 类。
public class DataClientFactory {
public static IClient getInstance() {
return ClientHolder.INSTANCE;
}
private static class ClientHolder {
private static final DataClient INSTANCE = new DataClient();
static {
new DataScheduler().startScheduleTask();
}
}
}
这是我的 DataClient 类。
public class DataClient implements IClient {
private ExecutorService service = Executors.newFixedThreadPool(15);
private RestTemplate restTemplate = new RestTemplate();
// for initialization purpose
public DataClient() {
try {
new DataScheduler().callDataService();
} catch (Exception ex) { // swallow the exception
// log exception
}
}
@Override
public DataResponse getDataSync(DataKey dataKeys) {
DataResponse response = null;
try {
Future<DataResponse> handle = getDataAsync(dataKeys);
response = handle.get(dataKeys.getTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// log error
response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
} catch (Exception e) {
// log error
response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
}
return response;
}
@Override
public Future<DataResponse> getDataAsync(DataKey dataKeys) {
Future<DataResponse> future = null;
try {
DataTask dataTask = new DataTask(dataKeys, restTemplate);
future = service.submit(dataTask);
} catch (Exception ex) {
// log error
}
return future;
}
}
我从上面的工厂获取我的客户端实例,如下所示,然后getDataSync
通过传递DataKey
对象来调用方法。DataKey 对象中包含userId
和Timeout
值。现在,在此之后,一旦被调用,就会调用我的DataTask
类来调用方法。call
handle.get
IClient dataClient = DataClientFactory.getInstance();
long userid = 1234l;
long timeout_ms = 500;
DataKey keys = new DataKey.Builder().setUserId(userid).setTimeout(timeout_ms)
.remoteFlag(false).secondaryFlag(true).build();
// call getDataSync method
DataResponse dataResponse = dataClient.getDataSync(keys);
System.out.println(dataResponse);
这是我的 DataTask 类,它具有所有逻辑 -
public class DataTask implements Callable<DataResponse> {
private DataKey dataKeys;
private RestTemplate restTemplate;
public DataTask(DataKey dataKeys, RestTemplate restTemplate) {
this.restTemplate = restTemplate;
this.dataKeys = dataKeys;
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
ResponseEntity<String> response = null;
int serialId = getSerialIdFromUserId();
boolean remoteFlag = dataKeys.isRemoteFlag();
boolean secondaryFlag = dataKeys.isSecondaryFlag();
List<String> hostnames = new LinkedList<String>();
Mappings mappings = ClientData.getMappings(dataKeys.whichFlow());
String localPrimaryAdress = null;
String remotePrimaryAdress = null;
String localSecondaryAdress = null;
String remoteSecondaryAdress = null;
// use mappings object to get above Address by using serialId and basis on
// remoteFlag and secondaryFlag populate the hostnames linked list
if (remoteFlag && secondaryFlag) {
hostnames.add(localPrimaryHostIPAdress);
hostnames.add(localSecondaryHostIPAdress);
hostnames.add(remotePrimaryHostIPAdress);
hostnames.add(remoteSecondaryHostIPAdress);
} else if (remoteFlag && !secondaryFlag) {
hostnames.add(localPrimaryHostIPAdress);
hostnames.add(remotePrimaryHostIPAdress);
} else if (!remoteFlag && !secondaryFlag) {
hostnames.add(localPrimaryHostIPAdress);
} else if (!remoteFlag && secondaryFlag) {
hostnames.add(localPrimaryHostIPAdress);
hostnames.add(localSecondaryHostIPAdress);
}
for (String hostname : hostnames) {
// If host name is null or host name is in local block host list, skip sending request to this host
if (hostname == null || ClientData.isHostBlocked(hostname)) {
continue;
}
try {
String url = generateURL(hostname);
response = restTemplate.exchange(url, HttpMethod.GET, dataKeys.getEntity(), String.class);
// make DataResponse
break;
} catch (HttpClientErrorException ex) {
// make DataResponse
return dataResponse;
} catch (HttpServerErrorException ex) {
// make DataResponse
return dataResponse;
} catch (RestClientException ex) {
// If it comes here, then it means some of the servers are down.
// Add this server to block host list
ClientData.blockHost(hostname);
// log an error
} catch (Exception ex) {
// If it comes here, then it means some weird things has happened.
// log an error
// make DataResponse
}
}
return dataResponse;
}
private String generateURL(final String hostIPAdress) {
// make an url
}
private int getSerialIdFromUserId() {
// get the id
}
}
现在基于userId
,我将获取serialId
然后获取主机名列表,我想根据传递的标志进行调用。然后我迭代hostnames
列表并调用服务器。假设我在链表中有四个主机名(A、B、C、D),那么我将首先调用 A,如果我取回数据,则返回 DataResponse。但是假设如果 A 已关闭,那么我需要立即将 A 添加到阻止列表,以便没有其他线程可以调用 A 主机名。然后调用主机名 B 并取回数据并返回响应(如果 B 也关闭,则重复相同的操作)。
我还有一个后台线程,它每 10 分钟运行一次,它会在我们从工厂获取客户端实例后立即启动,它会解析我的另一个服务 URL 以获取我们不应该调用的块主机名列表。由于它每 10 分钟运行一次,因此任何关闭的服务器,它只会在 10 分钟后获取列表,通常假设如果 A 关闭,那么我的服务将提供 A 作为主机名的阻止列表,并且一旦 A 启动,那么该列表也会在 10 分钟后更新。
这是我的后台线程代码DataScheduler-
public class DataScheduler {
private RestTemplate restTemplate = new RestTemplate();
private static final Gson gson = new Gson();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void startScheduleTask() {
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
callDataService();
} catch (Exception ex) {
// log an error
}
}
}, 0, 10L, TimeUnit.MINUTES);
}
public void callDataService() throws Exception {
String url = null;
// execute the url and get the responseMap from it as a string
parseResponse(responseMap);
}
private void parseResponse(Map<FlowsEnum, String> responses) throws Exception {
// .. some code here to calculate partitionMappings
// block list of hostnames
Map<String, List<String>> coloExceptionList = gson.fromJson(response.split("blocklist=")[1], Map.class);
for (Map.Entry<String, List<String>> entry : coloExceptionList.entrySet()) {
for (String hosts : entry.getValue()) {
blockList.add(hosts);
}
}
if (update) {
ClientData.setAllMappings(partitionMappings);
}
// update the block list of hostnames
if (!DataUtils.isEmpty(responses)) {
ClientData.replaceBlockedHosts(blockList);
}
}
}
这是我的 ClientData 类,它包含主机名阻止列表和 partitionMappings 详细信息(用于获取有效主机名列表)的所有信息。
public class ClientData {
private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = new AtomicReference<ConcurrentHashMap<String, String>>(
new ConcurrentHashMap<String, String>());
// some code here to set the partitionMappings by using CountDownLatch
// so that read is blocked for first time reads
public static boolean isHostBlocked(String hostName) {
return blockedHosts.get().contains(hostName);
}
public static void blockHost(String hostName) {
blockedHosts.get().put(hostName, hostName);
}
public static void replaceBlockedHosts(List<String> blockList) {
ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
for (String hostName : blockList) {
newBlockedHosts.put(hostName, hostName);
}
blockedHosts.set(newBlockedHosts);
}
}
问题陈述:-
当所有服务器都启动时(以 A、B、C、D 为例),上面的代码工作正常,我看不到任何TimeoutException
事情发生,handle.get
但是如果假设一台服务器(A)出现故障,我应该这样做从主线程进行调用,然后我开始看到很多TimeoutException
,我的意思是,发生了大量的客户端超时。
我不确定为什么会这样?一般来说,这不会发生,因为一旦服务器关闭,它就会被添加到 blockList 中,然后没有线程会调用该服务器,而是会尝试列表中的另一个服务器?所以这应该是一个顺利的过程,然后一旦这些服务器启动,blockList 将从后台线程更新,然后您就可以开始拨打电话了。
我上面的代码有什么问题会导致这个问题吗?任何建议都会有很大帮助。
一般来说,我想做的是 - 根据使用映射对象传递的用户 ID 创建主机名列表。然后调用第一个主机名并获取响应。但是,如果该主机名已关闭,则添加到阻止列表并调用列表中的第二个主机名。
这是我看到的 Stacktrace -
java.util.concurrent.TimeoutException\n\tat java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258)
java.util.concurrent.FutureTask.get(FutureTask.java:119)\n\tat com.host.client.DataClient.getDataSync(DataClient.java:20)\n\tat
注意:对于多个 userId,我们可以拥有相同的服务器,这意味着服务器 A 可以解析到多个 userId。