我有一个处理原始数据并将其保存到数据库的应用程序。执行 1 个线程所需的时间不到 100 毫秒。该应用程序每 20 秒从大约 200 个设备接收原始数据。一段时间后,应用程序挂起并且无法访问。在 wildfly 中查看 Busy Task Thread Count 时,我们发现 Busy Task Thread Count 达到了 128 个线程的限制。
即使处理时间非常短,我们也无法弄清楚为什么会发生这种情况。任何人都可以帮助解决这个问题。提前致谢。
@Controller
@EnableAsync
public class DataController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private IDataService dataService;
@Autowired
IDeviceService deviceService;
@Autowired
HttpServletResponse response;
@Autowired
HttpServletRequest request;
@Autowired
private IDeviceValidator deviceValidator;
@Autowired
private IJobService jobService;
@Autowired
private IDataAssembler dataAssembler;
@RequestMapping(value = { "/device_dat/auth/auth_tocken/imei/{imei_no}",
"/device_data/auth/auth_token/imei/{imei_no}" }, method = RequestMethod.POST, consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
@ResponseBody
public Object postDataText(@PathVariable(value = "imei_no") String imeiNo, @RequestBody String dataString) {
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::"
+ Thread.currentThread().getId() + "::postDataText::entry::"+ dataString+"::"+ sdf.format(startTimeMillis));
try {
Device device = (Device) deviceValidator.validateImeiNo(imeiNo);
if (device != null) {
DataVO dataVO = new DataVO();
dataVO.setImeiNo(imeiNo);
dataVO.setSerialNo(device.getSerialNo());
device.setLastUpdatedTime(new Date());
dataString = java.net.URLDecoder.decode(dataString, StandardCharsets.UTF_8.name());
dataVO.setSensorDataList(dataString);
List<Data> datas = dataAssembler.assembleData(dataVO, device);
String prefix = dataString.substring(0, 11);
deviceService.updateLastUpdatedData(datas, null, request.getSession().getId(), imeiNo, device);
dataService.updateDeviceDataAndInsertToDB(datas, request.getSession().getId(), imeiNo, device);
} else {
Map<String, String> data = new HashMap<String, String>();
data.put("message", "Device not Registered");
ResponseVO responseVO = new ResponseVO(ResponseTypeEnum.FAILURE.toString(), 400, data);
return responseVO;
}
} catch (Exception e) {
log.error("Error occured on postDataText method:" + e.getMessage() + "\ndataString was :" + dataString);
}
// response.addHeader("Baeldung-Example-Header", "Value-HttpServletResponse");
Long endTimeMillis = System.currentTimeMillis();
log.error(
"PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::postDataText::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));
return null;
}
}
---------------------------------------------------------------------
@Service
@EnableAsync
public class DataServiceImpl implements IDataService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
IDeviceService deviceService;
@Autowired
private DataDaoImpl dataDaoImpl;
@Autowired
private IDataRepository dataRepository;
@Autowired
private IApplicationService applicationService;
@Override
public Device updateDeviceDataAndInsertToDB(List<Data> datas, String prefix, String imeiNo, Device device) {
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateDeviceDataAndInsertToDB::entry::" + sdf.format(startTimeMillis));
try {
List<Data> upDatedList = new ArrayList<Data>();
datas.forEach(data -> {
if (data.getSourceDate() == null) {
data.setSourceDate(data.getCreatedDate());
}
deviceService.updateLastUpdatedTime(data.getSerialNo(),device);
data.setMetaData(device.getMetaData());
upDatedList.add(data);
});
dataRepository.insert(upDatedList);
} catch (Exception e) {
log.error("Method:save" + e.getMessage() + ", " + datas);
}
Long endTimeMillis = System.currentTimeMillis();
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateDeviceDataAndInsertToDB::exit::" + sdf.format(endTimeMillis) + "::"
+ (endTimeMillis - startTimeMillis));
return device;
}
}
-------------------------------------------------------------------------------------------------------
@Service
@EnableAsync
public class DeviceServiceImpl implements IDeviceService {
public final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
IDeviceDao deviceDao;
@Autowired
IDeviceRepository deviceRepository;
@Autowired
IApplicationService applicationService;
@Autowired
RestTemplate restTemplate;
@Autowired
IDataService dataService;
@Override
@Async
public void updateLastUpdatedData(List<Data> datas, Map<String, Object> metaData, String prefix, String imeiNo,
Device device) {
//sAsyncRestTemplate restTemplate = new AsyncRestTemplate();
/*
* WebClient client = WebClient.create(); Stream<Data> stringStream =
* datas.stream(); Flux<Data> fluxFromStream = Flux.fromStream(stringStream);
*/
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateLastUpdatedData::entry::" + sdf.format(startTimeMillis));
log.trace("Enter updateLastUpdatedData");
try {
Map<String, Object> response = new HashMap<String, Object>();
String applicationName = "gtest";
String urlString = "http://localhost:8080/" + applicationName + "/devices/";
String deviceSerialNo = device.getSerialNo();
String imei_no = imeiNo;
Map<String, Object> requestBody = new HashMap<String, Object>();
// For batch packet Device Follows LIFO
Data data = datas.get(0);
requestBody = (Map<String, Object>) data.getSensorDataList();
requestBody.put("source_date", data.getSourceDate());
if (metaData != null) {
requestBody.put("last_known_address", metaData.get("last_known_address"));
}
Map<String, Object> d = (Map<String, Object>) data.getSensorDataList();
Long count = 0L, emergencyCount = 0L;
for (Data datax : datas) {
Map<String, Object> x = (Map<String, Object>) datax.getSensorDataList();
try {
if (x.get("packet_type").toString().contains("Alert")) {
count = count + 1;
}
if (x.get("packet_type").toString().contains("Emergency button wire disconnect/wire-cut")) {
emergencyCount = emergencyCount + 1;
}
} catch (Exception e) {
// log.error("method:updateLastUpdatedData,"+e.getMessage()+"d"+d+"
// ,data:"+data);
}
}
requestBody.put("alert_count", count);
requestBody.put("emergency_alert_count", emergencyCount);
if (deviceSerialNo != null) {
urlString += deviceSerialNo;
restTemplate.postForObject(urlString, requestBody, Map.class);
}
} catch (Exception e) {
log.error("method:updateLastUpdatedData,"+e.getMessage()+", datas"+datas);
}
log.trace("Exit updateLastUpdatedData");
Long endTimeMillis = System.currentTimeMillis();
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateLastUpdatedData::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));
}
}
}
--------------------------------------------------------------------------------------------
@Repository
public class DataRepository implements IDataRepository {
public final Logger log = LoggerFactory.getLogger(this.getClass());
@Override
public void insert(List<Data> upDatedList) {
log.trace("Enter insert");
Query query = new Query();
query.maxTimeMsec(10);
MongoOperations mongoOperations = mongoTemplate;
mongoTemplate.insert(upDatedList, Data.class);
log.trace("Exit insert");
}
}