1

我有一些在纱线会话模式下在纱线集群上运行的 flink-streaming 作业,实际上我想分离每个作业日志以进行调试或其他目的。目前,我使用 Aspectj 和 Log4j MDC 来处理一些日志,例如下面的日志信息,它可以工作

[JobId-gamenews_push_readaction_c941c1dffe1450ec74ce6973cdb44961]-[2019-08-15 19:43:39] [ERROR][cn.migu.bi.flink.jobs.stream.gamenews.hot.IgniteReadSink][invoke][95]-> 处理数据发生异常:org.springframework.dao.DataIntegrityViolationException: PreparedStatementCallback; SQL [INSERT INTO readaction ( deviceId, userId, readedId, synTime ) VALUES ( ?, ?, ?, ? ) ]; Duplicate key during INSERT [key=SQL_PUBLIC_READACTION_abc3d77d_dbbe_45a7_8077_93b758c739ea_KEY [idHash=687625062, hash=223752637, DEVICEID=60c9e2f0a9d34c9f, READEDID=2000814]]; nested exception is java.sql.SQLException: Duplicate key during INSERT [key=SQL_PUBLIC_READACTION_abc3d77d_dbbe_45a7_8077_93b758c739ea_KEY [idHash=687625062, hash=223752637, DEVICEID=60c9e2f0a9d34c9f, READEDID=2000814]],发送脏数据:{"synTime":1565860801359,"readedArray":[2000814,2003419,2007497],"deviceId":"60c9e2f0a9d34c9f"}

但其他人似乎很难分开

public aspect Log {

    private Logger log = LoggerFactory.getLogger(Log.class);

    private pointcut executionJoinPoints():  execution (* cn.migu.bi.flink..*.open(..));
    before(): executionJoinPoints(){
        if (thisJoinPoint.getTarget() instanceof RichAsyncFunction) {
            RichAsyncFunction target = (RichAsyncFunction) thisJoinPoint.getTarget();
            Log4jUtils.initFlinkLog(target.getRuntimeContext().getMetricGroup());
        } else if (thisJoinPoint.getTarget() instanceof RichSinkFunction) {
            RichSinkFunction target = (RichSinkFunction) thisJoinPoint.getTarget();
            Log4jUtils.initFlinkLog(target.getRuntimeContext().getMetricGroup());
        }
    }


}

我尝试了其他一些方法,比如更改 flink 的源代码,添加一些代码,如 MDC.put("",""),它在我运行时在 IDE 上工作,但在 yarn-cluster 上失败

那么,是否有其他方法可以解决此问题?

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
        MDC.put("jobName",jobGraph.getName()+"-"+jobGraph.getJobID().toString());
        try {
            if (isDuplicateJob(jobGraph.getJobID())) {
                return FutureUtils.completedExceptionally(
                    new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted."));
            } else {
                return internalSubmitJob(jobGraph);
            }
        } catch (FlinkException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }
4

0 回答 0