Jak używać MDC z pulami wątków?


146

W naszym oprogramowaniu szeroko używamy MDC do śledzenia takich rzeczy, jak identyfikatory sesji i nazwy użytkowników dla żądań internetowych. Działa to dobrze podczas uruchamiania w oryginalnym wątku. Jest jednak wiele rzeczy, które muszą zostać przetworzone w tle. W tym celu używamy klas java.concurrent.ThreadPoolExecutori java.util.Timerwraz z niektórymi usługami wykonywania asynchronicznego samoczynnego przejścia . Wszystkie te usługi zarządzają własną pulą wątków.

Oto, co podręcznik Logback ma do powiedzenia na temat używania MDC w takim środowisku:

Kopia odwzorowanego kontekstu diagnostycznego nie zawsze może być dziedziczona przez wątki robocze z wątku inicjującego. Dzieje się tak, gdy do zarządzania wątkami używany jest java.util.concurrent.Executors. Na przykład metoda newCachedThreadPool tworzy ThreadPoolExecutor i podobnie jak inny kod puli wątków ma skomplikowaną logikę tworzenia wątków.

W takich przypadkach zaleca się wywołanie MDC.getCopyOfContextMap () w oryginalnym (głównym) wątku przed przesłaniem zadania do modułu wykonawczego. Po uruchomieniu zadania, jako pierwsze działanie, powinno wywołać MDC.setContextMapValues ​​() w celu skojarzenia przechowywanej kopii oryginalnych wartości MDC z nowym wątkiem zarządzanym przez moduł Executor.

Byłoby dobrze, ale bardzo łatwo jest zapomnieć o dodaniu tych połączeń i nie ma łatwego sposobu na rozpoznanie problemu, dopóki nie będzie za późno. Jedynym znakiem w Log4j jest to, że w dziennikach brakuje informacji MDC, a dzięki Logback otrzymujesz nieaktualne informacje MDC (ponieważ wątek w puli bieżnika dziedziczy swój MDC z pierwszego zadania, które zostało na nim uruchomione). Oba są poważnymi problemami w systemie produkcyjnym.

Nie uważam naszej sytuacji za wyjątkową, ale nie mogłem znaleźć wiele informacji na temat tego problemu w sieci. Najwyraźniej nie jest to coś, z czym wielu ludzi się boryka, więc musi istnieć sposób, aby tego uniknąć. Co tu robimy źle?


1
Jeśli aplikacja jest wdrażana w środowisku JEE, można użyć przechwytywaczy Java do ustawiania kontekstu MDC przed wywołaniem EJB.
Maxim Kirilov

2
Od wersji logback 1.1.5 wartości MDC nie są już dziedziczone przez wątki podrzędne.
Ceki


2
@Ceki Dokumentacja wymaga aktualizacji: „Wątek potomny automatycznie dziedziczy kopię zmapowanego kontekstu diagnostycznego swojego rodzica”. logback.qos.ch/manual/mdc.html
Steffen

Utworzyłem żądanie ściągnięcia do slf4j, które rozwiązuje problem używania MDC między wątkami (link github.com/qos-ch/slf4j/pull/150 ). Być może, jeśli ludzie skomentują i poproszą o to, uwzględnią zmianę w SLF4J :)
Mężczyzna

Odpowiedzi:


79

Tak, to jest częsty problem, z którym też się spotykam. Istnieje kilka obejść (takich jak ręczne ustawienie, zgodnie z opisem), ale najlepiej byłoby znaleźć takie rozwiązanie

  • Ustawia konsekwentnie MDC;
  • Pozwala uniknąć ukrytych błędów, w przypadku których MDC jest niepoprawne, ale o tym nie wiesz; i
  • Minimalizuje zmiany w sposobie korzystania z pul wątków (np. Podklasy Callablez MyCallablewszędzie lub podobną brzydotę).

Oto rozwiązanie, którego używam, które spełnia te trzy potrzeby. Kod powinien być zrozumiały.

(Na marginesie, ten executor może zostać utworzony i dostarczony do guawy MoreExecutors.listeningDecorator(), jeśli używasz guawy ListanableFuture).

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

Jeśli poprzedni kontekst nie jest pusty, czy nie zawsze jest to śmieć? Dlaczego go nosisz?
djjeck,

2
Dobrze; nie należy go ustawiać. Wydaje się, że to dobra higiena, np. Jeśli metoda wrap () została odsłonięta i używana przez kogoś innego w drodze.
jlevy

Czy możesz podać odniesienie do tego, w jaki sposób ten MdcThreadPoolExecutor został dołączony lub przywoływany przez Log4J2? Czy jest gdzieś, gdzie musimy konkretnie odwołać się do tej klasy, czy jest to zrobione „automagicznie”? Nie używam guawy. Mógłbym, ale chciałbym wiedzieć, czy jest inny sposób, zanim go użyję.
jcb

Jeśli dobrze rozumiem twoje pytanie, odpowiedź brzmi tak, to "magiczne" zmienne lokalne wątku w SLF4J - zobacz implementacje MDC.setContextMap () itd. Poza tym, przy okazji, używa SLF4J, a nie Log4J, co jest lepsze tak jak działa z Log4j, Logback i innymi konfiguracjami rejestrowania.
jlevy

1
Dla kompletności: jeśli używasz Springa ThreadPoolTaskExecutorzamiast zwykłej Javy ThreadPoolExecutor, możesz użyć MdcTaskDecoratoropisanego na moelholm.com/2017/07/24/ ...
Pino

27

Napotkaliśmy podobny problem. Możesz chcieć rozszerzyć ThreadPoolExecutor i zastąpić metody before / afterExecute, aby wykonać wywołania MDC, których potrzebujesz przed uruchomieniem / zatrzymaniem nowych wątków.


10
Metody beforeExecute(Thread, Runnable)i afterExecute(Runnable, Throwable)mogą być pomocne w innych przypadkach, ale nie jestem pewien, jak to zadziała w przypadku ustawiania MDC. Oba są wykonywane w zwołanym wątku. Oznacza to, że musisz być w stanie zdobyć zaktualizowaną mapę z głównego wątku wcześniej beforeExecute.
Kenston Choi

Lepiej jest ustawić wartości MDC w filtrze, co oznacza, że ​​gdy żądanie jest przetwarzane przez logikę biznesową, kontekst nie zostanie zaktualizowany. Myślę, że nie powinniśmy aktualizować MDC wszędzie w aplikacji
dereck

15

IMHO najlepszym rozwiązaniem jest:

  • posługiwać się ThreadPoolTaskExecutor
  • wdrożyć własne TaskDecorator
  • Użyj tego: executor.setTaskDecorator(new LoggingTaskDecorator());

Dekorator może wyglądać następująco:

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}

Przepraszam, nie bardzo wiem, co masz na myśli. UPDATE: Myślę, że teraz widzę, poprawi moją odpowiedź.
Tomáš Myšík

6

Oto jak robię to ze stałymi pulami wątków i programami wykonawczymi:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

W części gwintowania:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});

2

Podobnie jak w przypadku wcześniej opublikowanych rozwiązań, newTaskFormetody dla Runnablei Callablemożna nadpisać w celu zawijania argumentu (patrz zaakceptowane rozwiązanie) podczas tworzenia pliku RunnableFuture.

Uwaga: W związku z tym zamiast metody należy wywołać metodę executorService's .submitexecute

Albowiem ScheduledThreadPoolExecutor, te decorateTaskmetody będą nadpisywane zamiast.



0

Inną odmianą podobną do istniejących tutaj odpowiedzi jest zaimplementowanie ExecutorServicei zezwolenie na przekazanie delegatu. Następnie, używając typów ogólnych, nadal może ujawniać faktycznego delegata na wypadek, gdyby ktoś chciał uzyskać jakieś statystyki (o ile nie są używane inne metody modyfikacji).

Kod referencyjny:

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}

-3

Udało mi się to rozwiązać, stosując następujące podejście

W głównym wątku (Application.java, punkt wejścia mojej aplikacji)

static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

W metodzie uruchamiania klasy, która jest wywoływana przez Executer

MDC.setContextMap(Application.mdcContextMap);
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.