跟进如何将 MDC 与线程池一起使用?如何将 MDC 与 一起使用ForkJoinPool
?具体来说,我如何在执行任务之前包装一个ForkJoinTask
MDC 值?
问问题
4912 次
3 回答
8
以下似乎对我有用:
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.MDC;
/**
* A {@link ForkJoinPool} that inherits MDC contexts from the thread that queues a task.
*
* @author Gili Tzabari
*/
public final class MdcForkJoinPool extends ForkJoinPool
{
/**
* Creates a new MdcForkJoinPool.
*
* @param parallelism the parallelism level. For default value, use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value, use
* {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that terminate due to unrecoverable errors encountered
* while executing tasks. For default value, use {@code null}.
* @param asyncMode if true, establishes local first-in-first-out scheduling mode for forked tasks that are never
* joined. This mode may be more appropriate than default locally stack-based mode in applications
* in which worker threads only process event-style asynchronous tasks. For default value, use
* {@code false}.
* @throws IllegalArgumentException if parallelism less than or equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and the caller is not permitted to modify threads
* because it does not hold
* {@link java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public MdcForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler,
boolean asyncMode)
{
super(parallelism, factory, handler, asyncMode);
}
@Override
public void execute(ForkJoinTask<?> task)
{
// See http://stackoverflow.com/a/19329668/14731
super.execute(wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public void execute(Runnable task)
{
// See http://stackoverflow.com/a/19329668/14731
super.execute(wrap(task, MDC.getCopyOfContextMap()));
}
private <T> ForkJoinTask<T> wrap(ForkJoinTask<T> task, Map<String, String> newContext)
{
return new ForkJoinTask<T>()
{
private static final long serialVersionUID = 1L;
/**
* If non-null, overrides the value returned by the underlying task.
*/
private final AtomicReference<T> override = new AtomicReference<>();
@Override
public T getRawResult()
{
T result = override.get();
if (result != null)
return result;
return task.getRawResult();
}
@Override
protected void setRawResult(T value)
{
override.set(value);
}
@Override
protected boolean exec()
{
// According to ForkJoinTask.fork() "it is a usage error to fork a task more than once unless it has completed
// and been reinitialized". We therefore assume that this method does not have to be thread-safe.
Map<String, String> oldContext = beforeExecution(newContext);
try
{
task.invoke();
return true;
}
finally
{
afterExecution(oldContext);
}
}
};
}
private Runnable wrap(Runnable task, Map<String, String> newContext)
{
return () ->
{
Map<String, String> oldContext = beforeExecution(newContext);
try
{
task.run();
}
finally
{
afterExecution(oldContext);
}
};
}
/**
* Invoked before running a task.
*
* @param newValue the new MDC context
* @return the old MDC context
*/
private Map<String, String> beforeExecution(Map<String, String> newValue)
{
Map<String, String> previous = MDC.getCopyOfContextMap();
if (newValue == null)
MDC.clear();
else
MDC.setContextMap(newValue);
return previous;
}
/**
* Invoked after running a task.
*
* @param oldValue the old MDC context
*/
private void afterExecution(Map<String, String> oldValue)
{
if (oldValue == null)
MDC.clear();
else
MDC.setContextMap(oldValue);
}
}
和
import java.util.Map;
import java.util.concurrent.CountedCompleter;
import org.slf4j.MDC;
/**
* A {@link CountedCompleter} that inherits MDC contexts from the thread that queues a task.
*
* @author Gili Tzabari
* @param <T> The result type returned by this task's {@code get} method
*/
public abstract class MdcCountedCompleter<T> extends CountedCompleter<T>
{
private static final long serialVersionUID = 1L;
private final Map<String, String> newContext;
/**
* Creates a new MdcCountedCompleter instance using the MDC context of the current thread.
*/
protected MdcCountedCompleter()
{
this(null);
}
/**
* Creates a new MdcCountedCompleter instance using the MDC context of the current thread.
*
* @param completer this task's completer; {@code null} if none
*/
protected MdcCountedCompleter(CountedCompleter<?> completer)
{
super(completer);
this.newContext = MDC.getCopyOfContextMap();
}
/**
* The main computation performed by this task.
*/
protected abstract void computeWithContext();
@Override
public final void compute()
{
Map<String, String> oldContext = beforeExecution(newContext);
try
{
computeWithContext();
}
finally
{
afterExecution(oldContext);
}
}
/**
* Invoked before running a task.
*
* @param newValue the new MDC context
* @return the old MDC context
*/
private Map<String, String> beforeExecution(Map<String, String> newValue)
{
Map<String, String> previous = MDC.getCopyOfContextMap();
if (newValue == null)
MDC.clear();
else
MDC.setContextMap(newValue);
return previous;
}
/**
* Invoked after running a task.
*
* @param oldValue the old MDC context
*/
private void afterExecution(Map<String, String> oldValue)
{
if (oldValue == null)
MDC.clear();
else
MDC.setContextMap(oldValue);
}
}
MdcForkJoinPool
针对而不是常见的 ForkJoinPool运行您的任务。- 扩展
MdcCountedCompleter
而不是CountedCompleter
.
于 2016-03-18T02:34:10.480 回答
1
这是与@Gili 的回答一起提供的一些附加信息。
测试表明该解决方案有效(请注意,会有没有上下文的行,但至少它们不会是错误的上下文,这是正常 ForkJoinPool 发生的情况)。
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertThat;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.OutputStreamAppender;
public class MDCForkJoinPoolTest {
private static final Logger log = (Logger) LoggerFactory.getLogger("mdc-test");
// you can demonstrate the problem I'm trying to fix by changing the below to a normal ForkJoinPool and then running the test
private ForkJoinPool threads = new MDCForkJoinPool(16);
private Semaphore threadsRunning = new Semaphore(-99);
private ByteArrayOutputStream bio = new ByteArrayOutputStream();
@Test
public void shouldCopyManagedDiagnosticContextWhenUsingForkJoinPool() throws Exception {
for (int i = 0 ; i < 100; i++) {
Thread t = new Thread(simulatedRequest(), "MDC-Test-"+i);
t.setDaemon(true);
t.start();
}
// set up the appender to grab the output
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
OutputStreamAppender<ILoggingEvent> appender = new OutputStreamAppender<>();
LogbackEncoder encoder = new LogbackEncoder();
encoder.setPattern("%X{mdc_val:-}=%m%n");
encoder.setContext(lc);
encoder.start();
appender.setEncoder(encoder);
appender.setImmediateFlush(true);
appender.setContext(lc);
appender.setOutputStream(bio);
appender.start();
log.addAppender(appender);
log.setAdditive(false);
log.setLevel(Level.INFO);
assertThat("timed out waiting for threads to complete.", threadsRunning.tryAcquire(300, TimeUnit.SECONDS), is(true));
Set<String> ids = new HashSet<>();
try (BufferedReader r = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bio.toByteArray()), Charset.forName("utf8")))) {
r.lines().forEach(line->{
System.out.println(line);
String[] vals = line.split("=");
if (!vals[0].isEmpty()) {
ids.add(vals[0]);
assertThat(vals[1], startsWith(vals[0]));
}
});
}
assertThat(ids.size(), is(100));
}
private Runnable simulatedRequest() {
return () -> {
String id = UUID.randomUUID().toString();
MDC.put("mdc_val", id);
Map<String, String> context = MDC.getCopyOfContextMap();
threads.submit(()->{
MDC.setContextMap(context);
IntStream.range(0, 100).parallel().forEach((i)->{
log.info("{} - {}", id, i);
});
}).join();
threadsRunning.release();
};
}
}
此外,以下是原始答案中应覆盖的其他方法。
@Override
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
return super.submit(wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
return super.submit(wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
return super.submit(wrap(task, MDC.getCopyOfContextMap()), result);
}
@Override
public ForkJoinTask<?> submit(Runnable task) {
return super.submit(wrap(task, MDC.getCopyOfContextMap()));
}
private <T> Callable<T> wrap(Callable<T> task, Map<String, String> newContext)
{
return () ->
{
Map<String, String> oldContext = beforeExecution(newContext);
try
{
return task.call();
}
finally
{
afterExecution(oldContext);
}
};
}
于 2018-06-12T15:01:31.223 回答
0
我不熟悉,ForkJoinPool
但您可以将感兴趣的 MDC 键/值传递给ForkJoinTask
您实例化的实例,然后再将它们提交到ForkJoinPool
.
鉴于从 logback 版本 1.1.5 开始,MDC 值不会被子线程继承,因此没有太多选择。他们是
- 在实例化实例时将相关的 MDC 键/值传递给
ForkJoinTask
实例 - 扩展
ForkJoinPool
,以便将 MDC 键/值传递给新创建的线程 - 创建您自己的 ThreadFactory 将 MDC 键/值设置为新创建的线程
请注意,我实际上并没有实现选项 2. 或 3。
于 2016-03-17T05:21:15.460 回答