我有一些在纱线会话模式下在纱线集群上运行的 flink-streaming 作业,实际上我想分离每个作业日志以进行调试或其他目的。目前,我使用 Aspectj 和 Log4j MDC 来处理一些日志,例如下面的日志信息,它可以工作
[JobId-gamenews_push_readaction_c941c1dffe1450ec74ce6973cdb44961]-[2019-08-15 19:43:39] [ERROR][cn.migu.bi.flink.jobs.stream.gamenews.hot.IgniteReadSink][invoke][95]-> 处理数据发生异常:org.springframework.dao.DataIntegrityViolationException: PreparedStatementCallback; SQL [INSERT INTO readaction ( deviceId, userId, readedId, synTime ) VALUES ( ?, ?, ?, ? ) ]; Duplicate key during INSERT [key=SQL_PUBLIC_READACTION_abc3d77d_dbbe_45a7_8077_93b758c739ea_KEY [idHash=687625062, hash=223752637, DEVICEID=60c9e2f0a9d34c9f, READEDID=2000814]]; nested exception is java.sql.SQLException: Duplicate key during INSERT [key=SQL_PUBLIC_READACTION_abc3d77d_dbbe_45a7_8077_93b758c739ea_KEY [idHash=687625062, hash=223752637, DEVICEID=60c9e2f0a9d34c9f, READEDID=2000814]],发送脏数据:{"synTime":1565860801359,"readedArray":[2000814,2003419,2007497],"deviceId":"60c9e2f0a9d34c9f"}
但其他人似乎很难分开
public aspect Log {
private Logger log = LoggerFactory.getLogger(Log.class);
private pointcut executionJoinPoints(): execution (* cn.migu.bi.flink..*.open(..));
before(): executionJoinPoints(){
if (thisJoinPoint.getTarget() instanceof RichAsyncFunction) {
RichAsyncFunction target = (RichAsyncFunction) thisJoinPoint.getTarget();
Log4jUtils.initFlinkLog(target.getRuntimeContext().getMetricGroup());
} else if (thisJoinPoint.getTarget() instanceof RichSinkFunction) {
RichSinkFunction target = (RichSinkFunction) thisJoinPoint.getTarget();
Log4jUtils.initFlinkLog(target.getRuntimeContext().getMetricGroup());
}
}
}
我尝试了其他一些方法,比如更改 flink 的源代码,添加一些代码,如 MDC.put("",""),它在我运行时在 IDE 上工作,但在 yarn-cluster 上失败
那么,是否有其他方法可以解决此问题?
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
MDC.put("jobName",jobGraph.getName()+"-"+jobGraph.getJobID().toString());
try {
if (isDuplicateJob(jobGraph.getJobID())) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted."));
} else {
return internalSubmitJob(jobGraph);
}
} catch (FlinkException e) {
return FutureUtils.completedExceptionally(e);
}
}