ExecutorService, jak czekać na zakończenie wszystkich zadań


196

Jaki jest najprostszy sposób oczekiwania na ExecutorServicezakończenie wszystkich zadań ? Moje zadanie jest głównie obliczeniowe, więc chcę po prostu uruchomić dużą liczbę zadań - po jednym na każdym rdzeniu. W tej chwili moja konfiguracja wygląda następująco:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTaskimplementuje runnable. Wygląda na to, że poprawnie wykonuje zadania, ale kod zawiesza się wait()przy IllegalMonitorStateException. To dziwne, bo bawiłem się przykładami zabawek i wyglądało na to, że działają.

uniquePhraseszawiera kilkadziesiąt tysięcy elementów. Czy powinienem używać innej metody? Szukam czegoś tak prostego, jak to możliwe


1
jeśli chcesz użyć funkcji czekaj (odpowiedzi mówią, że nie): Zawsze musisz zsynchronizować obiekt (w tym przypadku es), gdy chcesz na niego poczekać - blokada zostanie automatycznie zwolniona podczas oczekiwania
mihi

13
lepszym sposobem na zainicjowanie Executors.newFixedThreadPool(System.getRuntime().availableProcessors());

7
Runtime.getRuntime (). AvailableProcessors ();
Vik Gamov

[To] [1] jest interesującą alternatywą .. [1]: stackoverflow.com/questions/1250643/…
Răzvan Petruescu,

1
jeśli chcesz użyć CountDownLatch, jest to przykładowy kod: stackoverflow.com/a/44127101/4069305
Tuan Pham

Odpowiedzi:


213

Najprostszym podejściem jest użycie, ExecutorService.invokeAll()które robi to, co chcesz w jednej linijce. W swoim języku musisz zmodyfikować lub zawinąć, ComputeDTaskaby zaimplementować Callable<>, co może dać ci nieco większą elastyczność. Prawdopodobnie w Twojej aplikacji jest znacząca implementacja Callable.call(), ale oto sposób na jej zawinięcie, jeśli nie jest używany Executors.callable().

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

Jak zauważyli inni, w invokeAll()razie potrzeby możesz użyć wersji limitu czasu . W tym przykładzie answersbędzie zawierać wiązkę Futures, która zwróci null (patrz definicja Executors.callable(). Prawdopodobnie to, co chcesz zrobić, to niewielkie przeredagowanie, abyś mógł uzyskać przydatną odpowiedź lub odniesienie do instrumentu bazowego ComputeDTask, ale ja mogę nie mów z twojego przykładu.

Jeśli nie jest to jasne, pamiętaj, że invokeAll()nie powróci, dopóki wszystkie zadania nie zostaną zakończone. (tzn. wszystkie Futures w twojej answerskolekcji zostaną zgłoszone, .isDone()jeśli zostaniesz o to poproszony). Pozwala to uniknąć ręcznego wyłączania, oczekiwania na zakończenie itp. i pozwala ci to wykorzystać ponownie ExecutorServicestarannie dla wielu cykli, jeśli chcesz.

Istnieje kilka powiązanych pytań dotyczących SO:

Żadne z nich nie jest ściśle związane z twoim pytaniem, ale zapewniają odrobinę koloru na temat tego, jak ludzie myślą Executor/ ExecutorServicepowinni być wykorzystywani.


9
Jest to idealne, jeśli dodajesz wszystkie swoje zadania w partii i zawiesisz się na liście Callables, ale nie zadziała, jeśli wywołujesz ExecutorService.submit () w sytuacji wywołania zwrotnego lub pętli zdarzeń.
Desty

2
Myślę, że warto wspomnieć, że shutdown () wciąż powinien być wywoływany, gdy ExecutorService nie jest już potrzebny, w przeciwnym razie wątki nigdy się nie zakończą (z wyjątkiem przypadków, gdy corePoolSize = 0 lub allowCoreThreadTimeOut = true).
John29,

niesamowity! Właśnie tego szukałem. Wielkie dzięki za udostępnienie odpowiedzi. Pozwól mi to wypróbować.
MohamedSanaulla

59

Jeśli chcesz poczekać na zakończenie wszystkich zadań, użyj shutdownmetody zamiast wait. Następnie postępuj zgodnie z awaitTermination.

Ponadto można użyć Runtime.availableProcessorsdo uzyskania liczby wątków sprzętowych, aby można było poprawnie zainicjować pulę wątków.


27
shutdown () powstrzymuje ExecutorService od przyjmowania nowych zadań i zamyka bezczynne wątki robocze. Nie określono oczekiwania na zakończenie zamykania, a implementacja w ThreadPoolExecutor nie czeka.
Alain O'Dea,

1
@Alain - dzięki. Powinienem był wspomnieć o oczekiwaniu na termin. Naprawiony.
NG.

5
Co jeśli, aby zadanie się zakończyło, musi zaplanować dalsze zadania? Na przykład możesz wykonać wielowątkowe przechodzenie przez drzewo, które przekazuje gałęzie do wątków roboczych. W takim przypadku, ponieważ ExecutorService jest natychmiast zamykany, nie akceptuje żadnych rekurencyjnie zaplanowanych zadań.
Brian Gordon,

2
awaitTerminationwymaga limitu czasu jako parametru. Chociaż można podać skończony czas i umieścić wokół niego pętlę, aby poczekać, aż wszystkie wątki się zakończą, zastanawiałem się, czy istnieje bardziej eleganckie rozwiązanie.
Abhishek S

1
Masz rację, ale zobacz tę odpowiedź - stackoverflow.com/a/1250655/263895 - zawsze możesz dać jej niesamowicie długi czas
NG.

48

Jeśli czekanie na ExecutorServicezakończenie wszystkich zadań nie jest dokładnie twoim celem, ale raczej czekanie, aż konkretna partia zadań zostanie zakończona, możesz użyć CompletionService- w szczególności ExecutorCompletionService.

Chodzi o to, aby stworzyć ExecutorCompletionServiceowijania TWOJEJ Executor, złożyć jakąś znaną liczbę zadań poprzez CompletionService, po czym wyciągnąć tę samą liczbę wyników z kolejki realizacji przy użyciu albo take()(co bloki) lub poll()(co nie). Po narysowaniu wszystkich oczekiwanych wyników odpowiadających przesłanym zadaniom wiesz, że wszystkie zostały wykonane.

Pozwól mi powiedzieć to jeszcze raz, ponieważ nie jest to oczywiste z interfejsu: musisz wiedzieć, ile rzeczy wkładasz do CompletionService, aby wiedzieć, ile rzeczy spróbować wyciągnąć. Ma to szczególne znaczenie w przypadku take()metody: wywołaj ją o jeden raz za dużo, a ona zablokuje Twój wątek wywołujący, dopóki jakiś inny wątek nie prześle innego zadania do tego samego CompletionService.

Istnieje kilka przykładów pokazujących, jak korzystaćCompletionService z książki Java Concurrency in Practice .


To dobry kontrapunkt dla mojej odpowiedzi - powiedziałbym, że bezpośrednią odpowiedzią na pytanie jest invokeAll (); ale @seh ma rację, przesyłając grupy zadań do ES i czekając na ich zakończenie ... --JA
andersoj

@ om-nom-nom, dziękuję za aktualizację linków. Cieszę się, że odpowiedź jest nadal przydatna.
seh

1
Dobra odpowiedź, nie wiedziałem o tymCompletionService
Vic

1
Takie podejście należy zastosować, jeśli nie chcesz zamknąć istniejącej usługi ExecutorService, ale po prostu chcesz przesłać partię zadań i wiedzieć, kiedy wszystkie zostaną zakończone.
ToolmakerSteve

11

Jeśli chcesz poczekać, aż usługa modułu wykonującego zakończy działanie, zadzwoń, shutdown()a następnie poczekaj na terminację (jednostki, typ jednostki) , np awaitTermination(1, MINUTE). ExecutorService nie blokuje na swoim własnym monitorze, więc nie można używać waititp.


Myślę, że to czeka na zakończenie.
NG.

@SB - Dzięki - widzę, że moja pamięć jest omylna! Zaktualizowałem nazwę i dla pewności dodałem link.
mdma

Aby czekać „na zawsze”, użyj go jak awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); stackoverflow.com/a/1250655/32453
rogerdpack

Myślę, że jest to najłatwiejsze podejście
Shervin Asgari

1
@MosheElisha, jesteś pewien? docs.oracle.com/javase/8/docs/api/java/util/concurrent/… mówi: inicjuje uporządkowane zamknięcie, w którym wykonywane są wcześniej przesłane zadania, ale żadne nowe zadania nie będą akceptowane.
Jaime Hablutzel

7

Możesz czekać na zakończenie zadań w określonym przedziale czasu:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

Lub możesz użyć ExecutorService . przesłać ( Runnable ) i zebrać przyszłe obiekty, które zwraca, i wywołać get () po kolei, aby czekać na zakończenie.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException jest niezwykle ważny, aby poprawnie się z nim obchodzić. To właśnie pozwala tobie lub użytkownikom twojej biblioteki bezpiecznie zakończyć długi proces.


6

Po prostu użyj

latch = new CountDownLatch(noThreads)

W każdym wątku

latch.countDown();

i jako bariera

latch.await();

6

Główna przyczyna wyjątku IllegalMonitorStateException :

Zgłaszany w celu wskazania, że ​​wątek próbował czekać na monitorze obiektu lub powiadomić inne wątki oczekujące na monitorze obiektu, nie posiadając określonego monitora.

Z twojego kodu właśnie wywołałeś wait () na ExecutorService bez posiadania blokady.

Poniższy kod naprawi IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

Wykonaj jedno z poniższych podejść, aby poczekać na zakończenie wszystkich zadań, które zostały przesłane ExecutorService.

  1. Iterację wszystkich Futurezadań z submitna ExecutorServicei sprawdzeniu stanu z blokowania połączeń get()na Futureobiekcie

  2. Korzystanie z invokeAll onExecutorService

  3. Korzystanie z CountDownLatch

  4. Korzystanie z ForkJoinPool lub newWorkStealingPool of Executors(od java 8)

  5. Zamknij pulę zgodnie z zaleceniami na stronie dokumentacji Oracle

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }

    Jeśli chcesz z wdziękiem czekać na zakończenie wszystkich zadań, gdy używasz opcji 5 zamiast opcji 1 do 4, zmień

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {

    do

    while(condition)który sprawdza co 1 minutę.


6

Możesz użyć ExecutorService.invokeAllmetody, wykona wszystkie zadania i poczeka, aż wszystkie wątki zakończą swoje zadanie.

Oto pełna wersja javadoc

Możesz także przeciążoną przez użytkownika wersję tej metody, aby określić limit czasu.

Oto przykładowy kod z ExecutorService.invokeAll

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}

3

Mam również sytuację, że mam zestaw dokumentów do przeszukania. Zaczynam od początkowego dokumentu „źródłowego”, który powinien zostać przetworzony, dokument ten zawiera linki do innych dokumentów, które również powinny zostać przetworzone, i tak dalej.

W moim głównym programie chcę po prostu napisać coś takiego, w którym Crawlerkontroluje kilka wątków.

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

Taka sama sytuacja miałaby miejsce, gdybym chciał nawigować po drzewie; wskoczyłbym do węzła głównego, procesor dla każdego węzła dodawałby dzieci do kolejki w razie potrzeby, a wiązka wątków przetwarzałaby wszystkie węzły w drzewie, dopóki nie będzie już więcej.

W JVM nie mogłem znaleźć niczego, co moim zdaniem było nieco zaskakujące. Napisałem więc klasę, ThreadPoolktórej można użyć bezpośrednio lub podklasy, aby dodać metody odpowiednie dla domeny, np schedule(Document). Mam nadzieję, że to pomoże!

ThreadPool Javadoc | Maven


Doc Link nie żyje
Manti_Core

@Manti_Core - dzięki, zaktualizowano.
Adrian Smith

2

Dodaj wszystkie wątki w kolekcji i prześlij za pomocą invokeAll. Jeśli możesz użyć invokeAllmetody ExecutorService, JVM nie przejdzie do następnego wiersza, dopóki wszystkie wątki nie zostaną zakończone.

Oto dobry przykład: invokeAll za pośrednictwem ExecutorService


1

Prześlij swoje zadania do Runnera, a następnie poczekaj na wywołanie metody waitTillDone () w następujący sposób:

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

Aby go użyć, dodaj następującą zależność grad / maven: 'com.github.matejtymes:javafixes:1.0'

Aby uzyskać więcej informacji, spójrz tutaj: https://github.com/MatejTymes/JavaFixes lub tutaj: http://matejtymes.blogspot.com/2016/04/executor-that-notified-you-when-task.html



0

Poczekam tylko na zakończenie executora z określonym limitem czasu, który Twoim zdaniem jest odpowiedni do wykonania zadań.

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }

0

Wygląda na to, że potrzebujesz ForkJoinPooli użyj globalnej puli do wykonywania zadań.

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

Piękno polega na pool.awaitQuiescencetym, że metoda będzie blokować wykorzystanie wątku dzwoniącego do wykonywania swoich zadań, a następnie powrót, gdy będzie naprawdę pusty.

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.