173

在我们的软件中,我们广泛使用MDC来跟踪 Web 请求的会话 ID 和用户名等内容。这在原始线程中运行时工作正常。

但是,有很多事情需要在后台处理。为此,我们使用java.concurrent.ThreadPoolExecutorjava.util.Timer类以及一些自滚动的异步执行服务。所有这些服务都管理自己的线程池。

这就是Logback 的手册中关于在这样的环境中使用 MDC 的内容:

工作线程不能总是从启动线程继承映射的诊断上下文的副本。当 java.util.concurrent.Executors 用于线程管理时就是这种情况。例如,newCachedThreadPool 方法创建一个 ThreadPoolExecutor 和其他线程池代码一样,它具有复杂的线程创建逻辑。

在这种情况下,建议在将任务提交给执行程序之前在原始(主)线程上调用 MDC.getCopyOfContextMap()。当任务运行时,作为它的第一个动作,它应该调用 MDC.setContextMapValues() 以将原始 MDC 值的存储副本与新的 Executor 托管线程相关联。

这很好,但是很容易忘记添加这些调用,并且没有简单的方法来识别问题,直到为时已晚。Log4j 的唯一标志是您在日志中缺少 MDC 信息,而使用 Logback 您会获得陈旧的 MDC 信息(因为线程池中的线程从在其上运行的第一个任务继承其 MDC)。两者都是生产系统中的严重问题。

我看不出我们的情况有什么特别之处,但我在网上找不到太多关于这个问题的信息。显然,这不是很多人遇到的事情,所以必须有办法避免它。我们在这里做错了什么?

4

8 回答 8

92

是的,这也是我遇到的一个常见问题。有一些解决方法(如手动设置,如上所述),但理想情况下,您需要一个解决方案

  • 一致地设置 MDC;
  • 避免MDC不正确但您不知道的默认错误;和
  • 最大限度地减少对线程池使用方式的更改(例如CallableMyCallable随处可见的子类化,或类似的丑陋)。

这是我使用的满足这三个需求的解决方案。代码应该是不言自明的。

MoreExecutors.listeningDecorator()(附带说明,如果您使用 Guava 's,则可以创建此执行程序并将其提供给 Guava 's ListanableFuture。)

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}
于 2013-10-12T02:02:19.910 回答
28

我们遇到了类似的问题。您可能希望扩展 ThreadPoolExecutor 并覆盖 before/afterExecute 方法,以便在启动/停止新线程之前进行所需的 MDC 调用。

于 2011-06-20T19:08:16.190 回答
22

恕我直言,最好的解决方案是:

  • 利用ThreadPoolTaskExecutor
  • 实现你自己的TaskDecorator
  • 用它:executor.setTaskDecorator(new LoggingTaskDecorator());

装饰器可以如下所示:

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}
于 2018-04-12T15:27:03.237 回答
12

这就是我使用固定线程池和执行程序的方式:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

在线程部分:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});
于 2018-01-10T09:45:51.160 回答
9

如果您在使用注释运行任务的 Spring 框架相关环境中遇到此问题,您可以使用TaskDecorator方法@Async来装饰任务。

此处提供了如何执行此操作的示例:

我遇到了这个问题,上面的文章帮助我解决了这个问题,所以我在这里分享它。

于 2020-03-18T14:26:39.003 回答
2

与之前发布的解决方案类似,newTaskFor在创建.RunnableCallableRunnableFuture

注意:因此,必须调用executorService'方法而不是方法。submitexecute

对于ScheduledThreadPoolExecutordecorateTask方法将被覆盖。

于 2016-09-12T16:59:42.397 回答
1

与此处现有答案类似的另一个变体是实现ExecutorService并允许将委托传递给它。然后使用泛型,它仍然可以公开实际的委托,以防万一想要获得一些统计信息(只要不使用其他修改方法)。

参考代码:

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}
于 2019-12-24T08:57:27.360 回答
-3

我能够使用以下方法解决这个问题

在主线程中(Application.java,我的应用程序的入口点)

static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

在 Executer 调用的类的 run 方法中

MDC.setContextMap(Application.mdcContextMap);
于 2017-05-22T19:10:09.047 回答