public void run() {
//log.info("taskDao:{},concurrentHashSet:{},id:{},shellName:{},shellDate:{}", taskDao, concurrentHashSet, id, shellName, shellDate);
log.info("concurrentHashSet:{},param:{}", concurrentHashSet, param);
int exeState = 99999;
// ssh跳转到其他机器去执行脚本,you can add "ssh rd@g1-jg-hadoop-01 \"source ~/.bash_profile ; bash -x %s %s\"" instead of command
String command = "source ~/.bash_profile ; bash -x %s %s";
String commandF = String.format(command, param.getShellName(), param.getRecallDate());
String[] cmdArr = {"/bin/sh", "-c", commandF};
long taskId = param.getTaskId();
Runtime runtime = Runtime.getRuntime();
Process process = null;
try {
process = runtime.exec(cmdArr);
InputStream stderr = process.getErrorStream();
InputStreamReader isr = new InputStreamReader(stderr);
BufferedReader br = new BufferedReader(isr);
String line = null;
StringBuilder sb = new StringBuilder();
while ((line = br.readLine()) != null) {
sb.append(line).append(System.lineSeparator());
}
log.info("exe task thread param:{},commandF:{},execute shell info:{}", param, commandF, sb.toString());
exeState = process.waitFor();
} catch (InterruptedException | IOException e) {
log.error("execute shell error,exeState:{},command:{}", exeState, commandF,e);
} finally {
log.info("execute shell state:{}", exeState);
// 从set中删除 更新表状态 多张表一个脚本,会有问题,其中状态覆盖
if (exeState == 0) {
// 执行成功
taskDao.updateStateByPrimaryKey(taskId, (short) 30, new Date());
// 邮件参数
param.setState(30);
String mailBody = SendMailUtil.beautyMailHtmlLayout(param);
String mailToAddress = param.getMailToUser();
String mailTitle = param.getMailContentTitle();
try {
SendMailUtil.sendMail(mailToAddress, mailTitle, mailBody);
} catch (Exception e) {
e.printStackTrace();
}
}
if (exeState != 0) {
// 执行失败
taskDao.updateStateByPrimaryKey(taskId, (short) 40, new Date());
// 邮件参数
param.setState(40);
String mailBody = SendMailUtil.beautyMailHtmlLayout(param);
String mailToAddress = param.getMailToUser();
try {
SendMailUtil.sendMail(mailToAddress, "回溯任务", mailBody);
} catch (Exception e) {
e.printStackTrace();
}
}
ExecuteTaskModel taskModel = new ExecuteTaskModel();
taskModel.setId(taskId);
log.info("remove task in thread,id:{}", taskId);
boolean remove;
if (concurrentHashSet.contains(taskModel)) {
remove = concurrentHashSet.remove(taskModel);
}
log.info("remove from set success set:{}", concurrentHashSet);
log.info("execute task thread exit!");
System.out.println("Process exitValue: " + exeState);
assert process != null;
process.destroy();
}
}