Obsługa wyjątków od zadań Java ExecutorService


213

Próbuję użyć ThreadPoolExecutorklasy Javy do uruchamiania dużej liczby ciężkich zadań z ustaloną liczbą wątków. Każde z zadań ma wiele miejsc, w których może się nie powieść z powodu wyjątków.

Podklasowałem ThreadPoolExecutori przesłoniłem afterExecutemetodę, która ma zapewnić nieprzechwycone wyjątki napotkane podczas uruchamiania zadania. Nie mogę jednak sprawić, by działało.

Na przykład:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

Dane wyjściowe tego programu to „Wszystko w porządku - sytuacja normalna!” nawet jeśli tylko Runnable przesłany do puli wątków zgłasza wyjątek. Masz jakieś wskazówki co się tutaj dzieje?

Dzięki!


nigdy nie pytałeś o Przyszłość zadania, co się tam wydarzyło. Cały wykonawca usługi lub program nie ulegnie awarii. Wyjątek jest przechwytywany i jest zawijany pod ExecutionException. I czy ponownie wrzuci, jeśli wywołasz future.get (). PS: Future.isDone () [Proszę przeczytać prawdziwą nazwę interfejsu API] zwróci wartość true, nawet jeśli uruchamialne działanie zakończy się błędnie. Ponieważ zadanie jest wykonywane naprawdę.
Jai Pandit

Odpowiedzi:


156

Z dokumentów :

Uwaga: Gdy akcje są zawarte w zadaniach (takich jak FutureTask) albo jawnie, albo za pomocą metod takich jak wysyłanie, te obiekty zadań wychwytują i utrzymują wyjątki obliczeniowe, dzięki czemu nie powodują nagłego zakończenia, a wyjątki wewnętrzne nie są przekazywane do tej metody .

Gdy prześlesz Runnable, zostanie on zapakowany w Przyszłość.

Twój afterExecute powinien wyglądać mniej więcej tak:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}

7
Dzięki, skończyło się na tym rozwiązaniu. Dodatkowo, na wypadek, gdyby ktoś był zainteresowany: inni zasugerowali, że nie należy podklasować usługi ExecutorService, ale zrobiłem to, ponieważ chciałem monitorować zadania po ich zakończeniu, zamiast czekać na zakończenie wszystkich zadań, a następnie wywoływać funkcję get () na wszystkich zwróconych kontraktach futures .
Tom

1
Innym podejściem do podklasowania modułu wykonującego jest podklasowanie FutureTask i zastąpienie metody „zrobione”
nos

1
Tom >> Czy możesz opublikować swój przykładowy fragment kodu, w którym
podklasujesz

1
Ta odpowiedź nie zadziała, jeśli używasz ComplableFuture.runAsync, ponieważ afterExecute będzie zawierał obiekt, który jest pakietem prywatnym i nie ma możliwości uzyskania dostępu do rzucanego obiektu. Obejrzałem to, kończąc rozmowę. Zobacz moją odpowiedź poniżej.
mmm

2
Czy musimy sprawdzić, czy przyszła przyszłość się skończy future.isDone()? Ponieważ afterExecutejest uruchamiany po Runnablezakończeniu, zakładam, że future.isDone()zawsze zwraca true.
Searene

248

OSTRZEŻENIE : Należy zauważyć, że to rozwiązanie zablokuje wątek wywołujący.


Jeśli chcesz przetworzyć wyjątki zgłoszone przez zadanie, zazwyczaj lepiej jest użyć Callableniż Runnable.

Callable.call() wolno zgłaszać sprawdzone wyjątki, które są propagowane z powrotem do wątku wywołującego:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Jeśli Callable.call()zgłosi wyjątek, zostanie on zawinięty w ExecutionExceptioni rzucony przez Future.get().

Jest to prawdopodobnie znacznie lepsze niż podklasowanie ThreadPoolExecutor. Daje to również możliwość ponownego przesłania zadania, jeśli wyjątek jest możliwy do odzyskania.


5
> Callable.call () może zgłaszać sprawdzone wyjątki, które są propagowane z powrotem do wątku wywołującego: Zwróć uwagę, że zgłoszony wyjątek zostanie propagowany do wątku wywołującego tylko wtedy, gdy future.get()zostanie wywołana jego przeciążona wersja.
nylowany

16
Jest idealny, ale co zrobić, jeśli uruchamiam zadania równolegle i nie chcę blokować wykonywania?
Grigory Kislin

43
Nie używaj tego rozwiązania, ponieważ psuje to cały cel korzystania z ExecutorService. ExecutorService to mechanizm wykonywania asynchronicznego, który może wykonywać zadania w tle. Wywołanie metody future.get () zaraz po jej wykonaniu spowoduje zablokowanie wątku wywołującego do momentu zakończenia zadania.
user1801374

2
To rozwiązanie nie powinno być tak wysoko oceniane. Future.get () działa synchronicznie i będzie działał jako bloker, dopóki Runnable lub Callable nie zostaną wykonane i jak wspomniano powyżej, nie udaje się użyć usługi Executora
Super Hans

2
Jak wskazał #ylhylated, zasługuje na BŁĄD jdk. Jeśli nie zostanie wywołana funkcja Future.get (), wszelkie nieprzechwycone wyjątki od wywoływania będą dyskretnie ignorowane. Bardzo zły projekt .... spędziłem ponad 1 dzień, aby dowiedzieć się, że biblioteka używa tego i jdk w milczeniu zignorował wyjątki. I to nadal istnieje w jdk12.
Ben Jiang

18

Wyjaśnienie tego zachowania znajduje się w javadoc dla afterExecute :

Uwaga: Gdy akcje są zawarte w zadaniach (takich jak FutureTask) albo jawnie, albo za pomocą metod takich jak wysyłanie, te obiekty zadań wychwytują i utrzymują wyjątki obliczeniowe, dzięki czemu nie powodują nagłego zakończenia, a wyjątki wewnętrzne nie są przekazywane do tej metody .


10

Obejrzałem to, owijając dostarczony plik wykonywalny przesłany do executora.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);

3
Możesz poprawić czytelność za pomocą whenComplete()metody CompletableFuture.
Eduard Wirch

@EduardWirch to działa, ale nie można odrzucić wyjątku od whenComplete ()
Akshat

7

Korzystam VerboseRunnablez klasy z jcabi-log , która połyka wszystkie wyjątki i rejestruje je. Bardzo wygodne, na przykład:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

3

Innym rozwiązaniem byłoby użycie ManagedTask i ManagedTaskListener .

Potrzebujesz Callable lub Runnable, który implementuje interfejs ManagedTask .

Metoda getManagedTaskListenerzwraca żądaną instancję.

public ManagedTaskListener getManagedTaskListener() {

I wdrożyć w ManagedTaskListener ten taskDonesposób:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Więcej informacji na temat cyklu życia zarządzanego zadania i detektora .


2

To działa

  • Wywodzi się z SingleThreadExecutor, ale możesz go łatwo dostosować
  • Kod lamdas Java 8, ale łatwy do naprawienia

Stworzy Executora z jednym wątkiem, który może uzyskać wiele zadań; i poczeka, aż bieżące zakończy wykonywanie, a rozpocznie się od następnego

W przypadku nieuczciwego błędu lub wyjątku uncaughtExceptionHandler go złapie

klasa publiczna SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions (końcowy Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable) -> {
            ostatni wątek newThread = nowy wątek (runnable, „SingleThreadExecutorWithExceptions”);
            newThread.setUncaughtExceptionHandler ((końcowy wątek caugthThread, końcowy Throwble wrzut) -> {
                uncaughtExceptionHandler.uncaughtException (caugthThread, thrownble);
            });
            return newThread;
        };
        zwraca nowy FinalizableDelegatedExecutorService
                (nowy ThreadPoolExecutor (1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        nowa LinkedBlockingQueue (),
                        fabryka){


                    chronione void afterExecute (Runnable runnable, Throwable throwble) {
                        super.afterExecute (runnable, throwble);
                        if (throwable == null && runnable instanceof Future) {
                            próbować {
                                Future future = (Future) runnable;
                                if (future.isDone ()) {
                                    future.get ();
                                }
                            } catch (CancellationException ce) {
                                do rzucania = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause ();
                            } catch (InterruptedException ie) {
                                Thread.currentThread (). Interrupt (); // zignoruj ​​/ zresetuj
                            }
                        }
                        if (do rzucenia! = null) {
                            uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), do rzucania);
                        }
                    }
                });
    }



    prywatna klasa statyczna FinalizableDelegatedExecutorService
            rozszerza DelegatedExecutorService {
        FinalizableDelegatedExecutorService (ExecutorService executor) {
            super (wykonawca);
        }
        protected void finalize () {
            super.shutdown ();
        }
    }

    / **
     * Klasa opakowania, która udostępnia tylko metody ExecutorService
     * implementacji ExecutorService.
     * /
    prywatna klasa statyczna DelegatedExecutorService rozszerza AbstractExecutorService {
        prywatny końcowy ExecutorService e;
        DelegatedExecutorService (ExecutorService executor) {e = executor; }
        public void execute (polecenie Runnable) {e.execute (polecenie); }
        public void shutdown () {e.shutdown (); }
        public List shutdownNow () {return e.shutdownNow (); }
        public boolean isShutdown () {return e.isShutdown (); }
        public boolean isTerminated () {return e.isTerminated (); }
        public boolean awaitTermination (długi limit czasu, jednostka TimeUnit)
                zgłasza InterruptedException {
            zwróć e.awaitTermination (limit czasu, jednostka);
        }
        public Prześlij w przyszłości (zadanie Runnable) {
            return e.submit (zadanie);
        }
        public Prześlij w przyszłości (zadanie na żądanie) {
            return e.submit (zadanie);
        }
        public Prześlij w przyszłości (zadanie Runnable, wynik T) {
            return e.submit (zadanie, wynik);
        }
        lista publiczna> invokeAll (kolekcja> zadania)
                zgłasza InterruptedException {
            return e.invokeAll (zadania);
        }
        lista publiczna> invokeAll (kolekcja> zadania,
                                             długi limit czasu, jednostka TimeUnit)
                zgłasza InterruptedException {
            zwróć e.invokeAll (zadania, limit czasu, jednostka);
        }
        public T invokeAny (kolekcja> zadania)
                zgłasza InterruptedException, ExecutionException {
            return e.invokeAny (zadania);
        }
        public T invokeAny (kolekcja> zadania,
                               długi limit czasu, jednostka TimeUnit)
                zgłasza InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny (zadania, limit czasu, jednostka);
        }
    }



    private SingleThreadExecutorWithExceptions () {}
}

Korzystanie z finalizacji jest niestety trochę niestabilne, ponieważ zostanie nazwane „później, gdy zbieracz śmieci go zbierze” (a może nie w przypadku wątku, dunno) ...
rogerdpack

1

Jeśli chcesz monitorować wykonanie zadania, możesz zakręcić 1 lub 2 wątkami (być może więcej w zależności od obciążenia) i użyć ich do pobrania zadań z opakowania ExecutionCompletionService.


0

Jeśli twój ExecutorServicepochodzi z zewnętrznego źródła (tzn. Nie jest możliwe podklasę ThreadPoolExecutori zastąpienie afterExecute()), możesz użyć dynamicznego serwera proxy, aby osiągnąć pożądane zachowanie:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}

0

Wynika to z tego, że AbstractExecutorService :: submitowijasz się runnablew RunnableFuture(nic poza FutureTask) jak poniżej

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Następnie executeprzekaże go Workeri Worker.run()zadzwoni poniżej.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

W końcu task.run();w powyższym kodzie zadzwoni FutureTask.run(). Oto kod obsługi wyjątku, dlatego NIE otrzymujesz oczekiwanego wyjątku.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

0

Jest to podobne do rozwiązania mmm, ale nieco bardziej zrozumiałe. Niech twoje zadania rozszerzą klasę abstrakcyjną, która otacza metodę run ().

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}

-4

Zamiast podklasowania ThreadPoolExecutor, dostarczyłbym mu instancję ThreadFactory, która tworzy nowe wątki i udostępnia im UncaughtExceptionHandler


3
Próbowałem tego również, ale wydaje się, że metoda uncaughtException nigdy nie jest wywoływana. Wierzę, że dzieje się tak, ponieważ wątek roboczy w klasie ThreadPoolExecutor przechwytuje wyjątki.
Tom

5
Metoda uncaughtException nie jest wywoływana, ponieważ metoda wysyłania ExecutorService otacza Callable / Runnable in a Future; rejestrowany jest tam wyjątek.
Emil Sit
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.