0

我有一个处理原始数据并将其保存到数据库的应用程序。执行 1 个线程所需的时间不到 100 毫秒。该应用程序每 20 秒从大约 200 个设备接收原始数据。一段时间后,应用程序挂起并且无法访问。在 wildfly 中查看 Busy Task Thread Count 时,我们发现 Busy Task Thread Count 达到了 128 个线程的限制。

即使处理时间非常短,我们也无法弄清楚为什么会发生这种情况。任何人都可以帮助解决这个问题。提前致谢。

@Controller
@EnableAsync
public class DataController {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private IDataService dataService;

    @Autowired
    IDeviceService deviceService;

    @Autowired
    HttpServletResponse response;

    @Autowired
    HttpServletRequest request;

    @Autowired
    private IDeviceValidator deviceValidator;

    @Autowired
    private IJobService jobService;

    @Autowired
    private IDataAssembler dataAssembler;


    @RequestMapping(value = { "/device_dat/auth/auth_tocken/imei/{imei_no}",
            "/device_data/auth/auth_token/imei/{imei_no}" }, method = RequestMethod.POST, consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
    @ResponseBody
    public Object postDataText(@PathVariable(value = "imei_no") String imeiNo, @RequestBody String dataString) {
        Long startTimeMillis = System.currentTimeMillis();
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        log.error("PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::"
                + Thread.currentThread().getId() + "::postDataText::entry::"+ dataString+"::"+ sdf.format(startTimeMillis));
        try {
            Device device = (Device) deviceValidator.validateImeiNo(imeiNo);
            if (device != null) {
                DataVO dataVO = new DataVO();
                dataVO.setImeiNo(imeiNo);
                dataVO.setSerialNo(device.getSerialNo());
                device.setLastUpdatedTime(new Date());
                dataString = java.net.URLDecoder.decode(dataString, StandardCharsets.UTF_8.name());
                dataVO.setSensorDataList(dataString);
                List<Data> datas = dataAssembler.assembleData(dataVO, device);
                String prefix = dataString.substring(0, 11);
                deviceService.updateLastUpdatedData(datas, null, request.getSession().getId(), imeiNo, device);
                dataService.updateDeviceDataAndInsertToDB(datas, request.getSession().getId(), imeiNo, device);

            } else {
                Map<String, String> data = new HashMap<String, String>();
                data.put("message", "Device not Registered");
                ResponseVO responseVO = new ResponseVO(ResponseTypeEnum.FAILURE.toString(), 400, data);
                return responseVO;
            }
        } catch (Exception e) {
            log.error("Error occured on postDataText method:" + e.getMessage() + "\ndataString was :" + dataString);
        }
        // response.addHeader("Baeldung-Example-Header", "Value-HttpServletResponse");
        Long endTimeMillis = System.currentTimeMillis();
        log.error(
                "PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::" + Thread.currentThread().getId()
                        + "::postDataText::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));
        return null;
    }
}
---------------------------------------------------------------------
@Service
@EnableAsync
public class DataServiceImpl implements IDataService {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    IDeviceService deviceService;

    @Autowired
    private DataDaoImpl dataDaoImpl;

    @Autowired
    private IDataRepository dataRepository;

    @Autowired
    private IApplicationService applicationService;

    @Override
    public Device updateDeviceDataAndInsertToDB(List<Data> datas, String prefix, String imeiNo, Device device) {
        Long startTimeMillis = System.currentTimeMillis();
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
                + "::updateDeviceDataAndInsertToDB::entry::" + sdf.format(startTimeMillis));

        try {
            List<Data> upDatedList = new ArrayList<Data>();
            datas.forEach(data -> {

                if (data.getSourceDate() == null) {
                    data.setSourceDate(data.getCreatedDate());
                }
                deviceService.updateLastUpdatedTime(data.getSerialNo(),device);

                data.setMetaData(device.getMetaData());
                upDatedList.add(data);
            });
            dataRepository.insert(upDatedList);

        } catch (Exception e) {
            log.error("Method:save" + e.getMessage() + ", " + datas);
        }
        Long endTimeMillis = System.currentTimeMillis();
        log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
                + "::updateDeviceDataAndInsertToDB::exit::" + sdf.format(endTimeMillis) + "::"
                + (endTimeMillis - startTimeMillis));
        return device;
    }
}

-------------------------------------------------------------------------------------------------------
@Service
@EnableAsync
public class DeviceServiceImpl implements IDeviceService {

    public final Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    IDeviceDao deviceDao;

    @Autowired
    IDeviceRepository deviceRepository;

    @Autowired
    IApplicationService applicationService;

    @Autowired
    RestTemplate restTemplate;

    @Autowired
    IDataService dataService;



    @Override
    @Async
    public void updateLastUpdatedData(List<Data> datas, Map<String, Object> metaData, String prefix, String imeiNo,
            Device device) {
        //sAsyncRestTemplate restTemplate = new AsyncRestTemplate();
        /*
         * WebClient client = WebClient.create(); Stream<Data> stringStream =
         * datas.stream(); Flux<Data> fluxFromStream = Flux.fromStream(stringStream);
         */

        Long startTimeMillis = System.currentTimeMillis();
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
                + "::updateLastUpdatedData::entry::" + sdf.format(startTimeMillis));

        log.trace("Enter updateLastUpdatedData");
        try {
            Map<String, Object> response = new HashMap<String, Object>();
            String applicationName = "gtest";

            String urlString = "http://localhost:8080/" + applicationName + "/devices/";

            String deviceSerialNo = device.getSerialNo();
            String imei_no = imeiNo;
            Map<String, Object> requestBody = new HashMap<String, Object>();
            // For batch packet Device Follows LIFO
            Data data = datas.get(0);
            requestBody = (Map<String, Object>) data.getSensorDataList();
            requestBody.put("source_date", data.getSourceDate());
            if (metaData != null) {
                requestBody.put("last_known_address", metaData.get("last_known_address"));
            }
            Map<String, Object> d = (Map<String, Object>) data.getSensorDataList();
            Long count = 0L, emergencyCount = 0L;

            for (Data datax : datas) {

                Map<String, Object> x = (Map<String, Object>) datax.getSensorDataList();
                try {
                    if (x.get("packet_type").toString().contains("Alert")) {
                        count = count + 1;
                    }
                    if (x.get("packet_type").toString().contains("Emergency button wire disconnect/wire-cut")) {
                        emergencyCount = emergencyCount + 1;
                    }
                } catch (Exception e) {
                    // log.error("method:updateLastUpdatedData,"+e.getMessage()+"d"+d+"
                    // ,data:"+data);
                }

            }
            requestBody.put("alert_count", count);
            requestBody.put("emergency_alert_count", emergencyCount);
            if (deviceSerialNo != null) {

                urlString += deviceSerialNo;

                restTemplate.postForObject(urlString, requestBody, Map.class);

            }


        } catch (Exception e) {
             log.error("method:updateLastUpdatedData,"+e.getMessage()+", datas"+datas);
        }
        log.trace("Exit updateLastUpdatedData");
        Long endTimeMillis = System.currentTimeMillis();
        log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
                + "::updateLastUpdatedData::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));

    }
    }
    }
    --------------------------------------------------------------------------------------------

@Repository
public class DataRepository implements IDataRepository {

    public final Logger log = LoggerFactory.getLogger(this.getClass()); 
    @Override
    public void insert(List<Data> upDatedList) {
        log.trace("Enter insert");
        Query query = new Query();
        query.maxTimeMsec(10);
        MongoOperations mongoOperations = mongoTemplate;
        mongoTemplate.insert(upDatedList, Data.class);
        log.trace("Exit insert");

    }
}
4

0 回答 0