我从进行顺序 HTTP 调用切换到 4 个 REST 服务,然后使用 commonj4 工作管理器任务执行器进行 4 个同时调用。我正在使用 WebLogic 12c。这个新代码适用于我的开发环境,但在负载条件下的测试环境中,有时在没有负载的情况下,结果映射不会填充所有结果。日志记录表明每个工作项确实收到了结果。这可能是 ConcurrentHashMap 的问题吗?在 IBM 的这个示例中,他们使用自己的 Work 版本,并且有一个 getData() 方法,尽管它不喜欢该方法确实存在于他们的类定义中。我遵循了一个不同的示例,该示例仅使用了 Work 类,但没有演示如何将数据从这些线程中获取到主线程中。我应该使用 execute() 而不是 schedule() 吗?API 似乎没有很好的文档记录。卡住的线程超时足够高。component.processInbound() 实际上包含 HTTP 调用的代码,但我的问题不存在,因为我可以切换回下面类的同步版本并且没有任何问题。
我的代码:
public class WorkManagerAsyncLinkedComponentRouter implements
MessageDispatcher<Object, Object> {
private List<Component<Object, Object>> components;
protected ConcurrentHashMap<String, Object> workItemsResultsMap;
protected ConcurrentHashMap<String, Exception> componentExceptionsInThreads;
...
//components is populated at this point with one component for each REST call to be made.
public Object route(final Object message) throws RouterException {
...
try {
workItemsResultsMap = new ConcurrentHashMap<String, Object>();
componentExceptionsInThreads = new ConcurrentHashMap<String, Exception>();
final String parentThreadID = Thread.currentThread().getName();
List<WorkItem> producerWorkItems = new ArrayList<WorkItem>();
for (final Component<Object, Object> component : this.components) {
producerWorkItems.add(workManagerTaskExecutor.schedule(new Work() {
public void run() {
//ExecuteThread th = (ExecuteThread) Thread.currentThread();
//th.setName(component.getName());
LOG.info("Child thread " + Thread.currentThread().getName() +" Parent thread: " + parentThreadID + " Executing work item for: " + component.getName());
try {
Object returnObj = component.processInbound(message);
if (returnObj == null)
LOG.info("Object returned to work item is null, not adding to producer components results map, for this producer: "
+ component.getName());
else {
LOG.info("Added producer component thread result for: "
+ component.getName());
workItemsResultsMap.put(component.getName(), returnObj);
}
LOG.info("Finished executing work item for: " + component.getName());
} catch (Exception e) {
componentExceptionsInThreads.put(component.getName(), e);
}
}
...
}));
} // end loop over producer components
// Block until all items are done
workManagerTaskExecutor.waitForAll(producerWorkItems, stuckThreadTimeout);
LOG.info("Finished waiting for all producer component threads.");
if (componentExceptionsInThreads != null
&& componentExceptionsInThreads.size() > 0) {
...
}
List<Object> resultsList = new ArrayList<Object>(workItemsResultsMap.values());
if (resultsList.size() == 0)
throw new RouterException(
"The producer thread results are all empty. The threads were likely not created. In testing this was observed when either 1)the system was almost out of memory (Perhaps the there is not enough memory to create a new thread for each producer, for this REST request), or 2)Timeouts were reached for all producers.");
//** The problem is identified here. The results in the ConcurrentHashMap aren't the number expected .
if (workItemsResultsMap.size() != this.components.size()) {
StringBuilder sb = new StringBuilder();
for (String str : workItemsResultsMap.keySet()) {
sb.append(str + " ");
}
throw new RouterException(
"Did not receive results from all threads within the thread timeout period. Only retrieved:"
+ sb.toString());
}
LOG.info("Returning " + String.valueOf(resultsList.size()) + " results.");
LOG.debug("List of returned feeds: " + String.valueOf(resultsList));
return resultsList;
}
...
}
}