9

在过去,我们不得不ThreadLocal让程序在请求路径中携带数据,因为所有请求处理都是在该线程上完成的,像 Logback 这样的东西使用它MDC.put("requestId", getNewRequestId());

然后 Scala 和函数式编程出现了,Futures 也随之出现了Local.scala(至少我知道 twitterFuture有这个类)。 通过所有/等功能Future.scala了解Local.scala并传输上下文,这样我仍然可以做,然后在它经过许多线程后向下游发送,我仍然可以使用它访问它mapflatMapLocal.set("requestId", getNewRequestId());Local.get(...)

Soooo,我的问题是在 Java 中,我可以用新的CompletableFuture某处LocalContext或某个对象(不确定名称)做同样的事情吗?这样,我可以修改 Logback MDC 上下文以将其存储在该上下文中而不是ThreadLocal这样我不会丢失请求 id 和我的所有thenApply日​​志thenAccept,等等等等。仍然可以正常使用日志记录和-XrequestIdLogback 配置中的标志。

编辑:

举个例子。如果您有一个请求进来并且您正在使用 Log4j 或 Logback,您将在过滤器中设置MDC.put("requestId", requestId),然后在您的应用程序中,您将记录许多日志语句:

log.info("request came in for url="+url);
log.info("request is complete");

现在,在日志输出中它将显示:

INFO {time}: requestId425 request came in for url=/mypath
INFO {time}: requestId425 request is complete

这是使用一个技巧ThreadLocal来实现这一点。在 Twitter,我们使用 Scala 和 ScalaFuture中的 Twitter 以及一个Local.scala类。 Local.scala并且Future.scala联系在一起,因为我们仍然可以实现上述场景,这非常好,我们所有的日志语句都可以记录请求 ID,因此开发人员永远不必记住记录请求 ID,您可以跟踪单个客户的请求响应周期用那个身份证。

我在 Java 中看不到这一点 :( 这很不幸,因为有很多用例。也许有一些我没有看到的东西?

4

2 回答 2

2

我的解决方案主题是(它可以与 JDK 9+ 一起使用,因为自该版本以来公开了几个可覆盖的方法)

让整个生态系统了解 MDC

为此,我们需要解决以下场景:

  • 我们什么时候从这个类中获得 CompletableFuture 的新实例?→ 我们需要返回相同的 MDC 感知版本。
  • 我们什么时候从这个类之外获得 CompletableFuture 的新实例?→ 我们需要返回相同的 MDC 感知版本。
  • 在 CompletableFuture 类中使用哪个执行器?→ 在任何情况下,我们都需要确保所有执行者都了解 MDC

为此,让我们CompletableFuture通过扩展它来创建一个 MDC 感知版本类。我的版本如下所示

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {

    public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();

    @Override
    public CompletableFuture newIncompleteFuture() {
        return new MDCAwareCompletableFuture();
    }

    @Override
    public Executor defaultExecutor() {
        return MDC_AWARE_ASYNC_POOL;
    }

    public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
        return new MDCAwareCompletableFuture<>()
                .completeAsync(() -> null)
                .thenCombineAsync(future, (aVoid, value) -> value);
    }

    public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
                                                                Function<Throwable, T> throwableFunction) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return getMDCAwareCompletionStage(future)
                .handle((value, throwable) -> {
                    setMDCContext(contextMap);
                    if (throwable != null) {
                        return throwableFunction.apply(throwable);
                    }
                    return value;
                });
    }
}

该类MDCAwareForkJoinPool看起来像(ForkJoinTask为简单起见,跳过了带参数的方法)

public class MDCAwareForkJoinPool extends ForkJoinPool {
    //Override constructors which you need

    @Override
    public <T> ForkJoinTask<T> submit(Callable<T> task) {
        return super.submit(MDCUtility.wrapWithMdcContext(task));
    }

    @Override
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
        return super.submit(wrapWithMdcContext(task), result);
    }

    @Override
    public ForkJoinTask<?> submit(Runnable task) {
        return super.submit(wrapWithMdcContext(task));
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrapWithMdcContext(task));
    }
}

包装的实用方法如下

public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.call();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static Runnable wrapWithMdcContext(Runnable task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.run();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static void setMDCContext(Map<String, String> contextMap) {
   MDC.clear();
   if (contextMap != null) {
       MDC.setContextMap(contextMap);
    }
}

以下是一些使用指南:

  • 使用类MDCAwareCompletableFuture而不是类CompletableFuture
  • 类中的几个方法CompletableFuture实例化了 self 版本,例如new CompletableFuture.... 对于此类方法(大多数公共静态方法),请使用替代方法来获取MDCAwareCompletableFuture. 使用替代方法的示例可能是,而不是使用CompletableFuture.supplyAsync(...),您可以选择new MDCAwareCompletableFuture<>().completeAsync(...)
  • 当您因为某个外部库返回CompletableFuture一个. 显然,您不能在该库中保留上下文,但是在您的代码命中应用程序代码后,此方法仍会保留上下文。MDCAwareCompletableFuturegetMDCAwareCompletionStageCompletableFuture
  • 在提供 executor 作为参数时,请确保它是 MDC Aware,例如MDCAwareForkJoinPool. 您也可以MDCAwareThreadPoolExecutor通过覆盖execute方法创建以服务于您的用例。你明白了!

您可以在一篇关于相同内容的帖子中找到上述所有内容的详细说明。

于 2019-12-30T04:09:51.280 回答
1

如果你遇到这个,只需戳这里的线程 http://mail.openjdk.java.net/pipermail/core-libs-dev/2017-May/047867.html

实现类似 twitter Futures 的东西,它传输本地变量(很像 ThreadLocal,但传输状态)。

请参阅此处的 def respond() 方法以及它如何调用 Locals.save() 和 Locals.restore() https://github.com/simonratner/twitter-util/blob/master/util-core/src/main/ scala/com/twitter/util/Future.scala

如果 Java Authors 能解决这个问题,那么 logback 中的 MDC 将适用于所有 3rd 方库。在那之前,除非您可以更改 3rd 方库,否则它将无法正常工作(怀疑您是否可以这样做)。

于 2019-02-13T23:46:12.553 回答