Jak czekać na zakończenie wszystkich wątków za pomocą ExecutorService?


381

Muszę wykonać pewną liczbę zadań 4 na raz, mniej więcej tak:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

Jak mogę otrzymać powiadomienie, gdy wszystkie zostaną ukończone? Na razie nie mogę wymyślić nic lepszego niż ustawienie globalnego licznika zadań i zmniejszenie go na końcu każdego zadania, a następnie monitorowanie w nieskończonej pętli tego licznika, aby stał się 0; lub uzyskaj listę kontraktów futures i w nieskończonej pętli monitor jest zrobiony dla nich wszystkich. Jakie są lepsze rozwiązania nieobejmujące nieskończonych pętli?

Dzięki.

Odpowiedzi:


446

Zasadniczo na ExecutorServicetelefon, shutdown()a następnie awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

9
właśnie do tego służy zamknięcie / oczekiwanie Terminacja
mat b

31
Jest dobrym wzorcem, jeśli obsługa tego zadania jest zdarzeniem jednorazowym. Jeśli jednak jest to powtarzane podczas tego samego środowiska uruchomieniowego, nie jest to optymalne, ponieważ należy wielokrotnie tworzyć i rozrywać wątki za każdym razem, gdy jest ono wykonywane.
sjlee

44
Szukam jakiejkolwiek oficjalnej dokumentacji Long.MAX_VALUE, TimeUnit.NANOSECONDSrównoważnej z brakiem limitu czasu.
Sam Harwell,

15
Nie mogę uwierzyć, że musisz użyć shutdown, aby dołączyć do wszystkich bieżących wątków (po użyciu shutdown nie możesz już nigdy użyć executora). Zasugeruj użycie listy Future's zamiast ...
rogerdpack,

20
@SamHarwell zobacz dokumentacjęjava.util.concurrent pakietu w sekcji: Aby poczekać „na zawsze”, możesz użyć wartościTimingLong.MAX_VALUE
beluchin

174

Użyj CountDownLatch :

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

i w ramach twojego zadania (załącz w try / wreszcie)

latch.countDown();

3
Nie ma 4 zadań. Wykonuje się „pewną liczbę zadań” 4 jednocześnie.
cletus,

2
Przepraszam, źle zrozumiałem pytanie. Tak, liczba zadań powinna być argumentem dla konstruktora CountDownLatch
ChssPly76

3
Uważam to rozwiązanie za bardziej eleganckie od innych, wygląda na to, że zostało stworzone do tego celu, i jest proste.
wvdschel,

3
Co zrobić, jeśli nie znasz liczby zadań przed rozpoczęciem?
cletus

11
@cletus - wtedy nie korzystasz z CountDownLatch :-) Pamiętaj, nie twierdzę, że takie podejście jest lepsze niż twoje. Uważam jednak, że w rzeczywistych sytuacjach życiowych ja nie wiem liczbę zadań, ustawienia puli wątków nie muszą być konfigurowane za wdrożenie i baseny mogą być ponownie wykorzystane. Tak więc zazwyczaj Spring wstrzykuje pule wątków i ustawianie ich jako prototypów, a ręczne wyłączanie ich tylko w celu oczekiwania na zakończenie wątków wydaje się mniej niż idealne.
ChssPly76,

82

ExecutorService.invokeAll() robi to dla ciebie.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

Trudność pojawia się, jeśli / kiedy zaczynasz wątki „4” pojedynczo, pod względem kawałków, a następnie dołącz / pozwól zakończyć wszystkie 4 ...
rogerdpack

@rogerdpack: Nadal uczę się tych programów i tak dalej. Ale w odpowiedzi na to, o co prosisz. Czy 4 wątki jednocześnie nie powinny być częścią zadania wsadowego wykonywanego przy użyciu powyższej odpowiedzi?
Mukul Goel,

3
Ta metoda działa tylko wtedy, gdy znasz liczbę zadań wcześniej.
Konstantin

3
Myślę, że kiedy futureszostaną zwrócone, zadania nie zostały zakończone. Mogą one zostać ukończone w przyszłości, a otrzymasz link do wyniku. Dlatego nazywa się to Future. Masz metodę Future.get () , która będzie czekać na zakończenie zadania, aby uzyskać wynik.
AlikElzin-kilaka

5
@ AlikElzin-kilaka Cytat z JavaDocs (połączony w odpowiedzi): „Wykonuje podane zadania, zwracając listę kontraktów futures zachowujących ich status i wyniki po zakończeniu wszystkich. Future.isDone () jest prawdziwe dla każdego elementu zwracanej listy. „
Hulk,

47

Możesz również korzystać z list kontraktów terminowych:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

wtedy, gdy chcesz dołączyć do nich wszystkich, jest to w zasadzie odpowiednik łączenia się na każdym z nich (z tą dodatkową korzyścią, że podnosi wyjątki od wątków potomnych do głównego):

for(Future f: this.futures) { f.get(); }

Zasadniczo sztuką jest wywoływanie .get () na każdej przyszłości, zamiast nieskończonego zapętlenia, wywołanie isDone () na (wszystkich lub każdym). Więc masz gwarancję, że „przejdziesz” przez ten blok, jak tylko skończy się ostatni wątek. Zastrzeżenie polega na tym, że ponieważ wywołanie .get () ponownie podnosi wyjątki, jeśli jeden z wątków umrze, możesz podnieść z tego prawdopodobnie przed zakończeniem innych wątków [aby tego uniknąć, możesz dodać catch ExecutionExceptionwokół wywołania get ]. Drugim zastrzeżeniem jest to, że zachowuje odniesienie do wszystkich wątków, więc jeśli mają zmienne lokalne wątku, nie zostaną zebrane, dopóki nie przejdziesz przez ten blok (chociaż możesz być w stanie obejść ten problem, jeśli stanie się to problemem, usuwając Future's off the ArrayList). Jeśli chcesz wiedzieć, która przyszłość „kończy jako pierwsza”https://stackoverflow.com/a/31885029/32453


3
Aby dowiedzieć się, który „kończy jako pierwszy”, użyj ExecutorCompletionService.take: stackoverflow.com/a/11872604/199364
ToolmakerSteve

34

W Javie 8 możesz to zrobić za pomocą CompletableFuture :

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

3
To bardzo eleganckie rozwiązanie.
mattvonb

ExecutorService es = Executors.newFixedThreadPool(4); List< Future<?>> futures = new ArrayList<>(); for(Runnable task : taskList) { futures.add(es.submit(task)); } for(Future<?> future : futures) { try { future.get(); }catch(Exception e){ // do logging and nothing else } }
user2862544

@AdamSkywalker jest wymagany po es.shutdown ()?
gaurav

@gaurav podczas wywoływania zamykania niektóre zadania mogą jeszcze nie zostać ukończone. OczekiwanieTermination zablokuje wątek wywołujący, dopóki wszystko się nie skończy. Zależy to od tego, czy musisz czekać na wyniki w tym wątku, czy nie.
AdamSkywalker,

@AdamSkywalker świetna odpowiedź. warto nie wywoływać funkcji awaitTermination (), jeśli nie muszę czekać na wyniki.
gaurav

26

Tylko moje dwa centy. Aby przezwyciężyć CountDownLatchpotrzebę wcześniejszej znajomości liczby zadań, możesz to zrobić w tradycyjny sposób, używając prostego Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

W swoim zadaniu po prostu zadzwoń s.release()jak chceszlatch.countDown();


Widząc to, najpierw zastanawiałem się, czy nie byłoby problemu, gdyby niektóre releasepołączenia miały miejsce przed acquirepołączeniem, ale po przeczytaniu dokumentacji Semafora widzę, że jest w porządku.
ToolmakerSteve

13

Trochę późno do gry, ale ze względu na zakończenie ...

Zamiast „czekać” na zakończenie wszystkich zadań, możesz pomyśleć zgodnie z zasadą Hollywood: „nie dzwoń do mnie, zadzwonię” - kiedy skończę. Myślę, że wynikowy kod jest bardziej elegancki ...

Guava oferuje kilka interesujących narzędzi do osiągnięcia tego celu.

Przykład ::

Zawiń ExecutorService w ListeningExecutorService ::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Prześlij zbiór kallabów do wykonania:

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

Teraz zasadnicza część:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Załącz oddzwonienie do ListenableFuture, którego możesz użyć, aby otrzymywać powiadomienia o zakończeniu wszystkich kontraktów futures:

        Futures.addCallback(lf, new FutureCallback<List<Integer>>() {
        @Override
        public void onSuccess(List<Integer> result) {
            log.info("@@ finished processing {} elements", Iterables.size(result));
            // do something with all the results
        }

        @Override
        public void onFailure(Throwable t) {
            log.info("@@ failed because of :: {}", t);
        }
    });

Daje to również tę zaletę, że po zakończeniu przetwarzania można zebrać wszystkie wyniki w jednym miejscu ...

Więcej informacji tutaj


2
Bardzo czysto. Działa bezbłędnie nawet na Androidzie. Po prostu musiałem użyć runOnUiThread()w onSuccess().
DSchmidt,

12

CyclicBarrier klasa w Javie 5 i później jest przeznaczony do tego rodzaju rzeczy.


7
Fajnie, nigdy nie pamiętam nazwy tej struktury danych. Jednak odpowiednie tylko, jeśli wiesz wcześniej, ile zadań będzie czekało w kolejce.
ᆼ ᆺ ᆼ

Tak można by pomyśleć, że będziesz w stanie trafić barierę z bieżącego wątku, a wszystkie wątki dziecko, a potem, kiedy przeszedł wiedziałbyś nici dziecko zostało zrobione ...
rogerdpack

Właściwie to zła odpowiedź. CyclicBarrier przeznaczony do porcji. CountDownLatch przeznaczony na wydarzenie oczekujące
gstackoverflow

7

Wykonaj jedno z poniższych podejść.

  1. Iterację wszystkich przyszłych zadań, powrócił z submitna ExecutorServicei sprawdzić stan połączenia z blokowania get()na Futureobiekcie zgodnie z sugestiąKiran
  2. Użyj invokeAll()na ExecutorService
  3. CountDownLatch
  4. ForkJoinPool lub Executors.html # newWorkStealingPool
  5. Użyj shutdown, awaitTermination, shutdownNowinterfejsów API ThreadPoolExecutor w odpowiedniej kolejności

Powiązane pytania SE:

Jak CountDownLatch jest używany w wielowątkowości Java?

Jak poprawnie zamknąć java ExecutorService


6

oto dwie opcje, trochę mylące, która najlepiej wybrać.

Opcja 1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

Opcja 2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

Tutaj umieszczamy future.get (); w try catch to dobry pomysł, prawda?


5

Możesz zawinąć swoje zadania w inny program uruchamialny, który wyśle ​​powiadomienia:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

1
Chwilę zajęło mi sprawdzenie, jak to rozwiązałoby pytanie OP. Po pierwsze, zwróć uwagę, że to zawijanie dotyczy każdego zadania, a nie kodu rozpoczynającego wszystkie zadania. Przypuszczalnie każdy start zwiększyłby licznik, a każde zakończenie zmniejszyłby ten licznik lub zwiększyłby completedlicznik. Więc po uruchomieniu ich wszystkich, przy każdym powiadomieniu może ustalić, czy wszystkie zadania zostały zakończone. Należy pamiętać, że konieczne jest korzystanie z niego try/finally, aby otrzymać gotowe powiadomienie (lub alternatywne powiadomienie w catchbloku), nawet jeśli zadanie się nie powiedzie. W przeciwnym razie czekałbym wiecznie.
ToolmakerSteve

3

Właśnie napisałem przykładowy program, który rozwiązuje twój problem. Nie podano zwięzłej implementacji, więc dodam jedną. Chociaż możesz używać executor.shutdown()i executor.awaitTermination(), nie jest to najlepsza praktyka, ponieważ czas potrzebny na różne wątki byłby nieprzewidywalny.

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

Dobrze, że pokazujesz korzystanie z future.get - dobra alternatywa, o której warto wiedzieć. Ale dlaczego uważasz, że lepiej jest czekać wiecznie , niż ustawić maksymalny dopuszczalny limit czasu? Co ważniejsze, nie ma powodu, aby robić całą tę logikę, gdy można po prostu naprawdę długo czekać na termin, jeśli chcesz poczekać (zasadniczo na zawsze), aż wszystkie zadania zostaną ukończone.
ToolmakerSteve

Nie różni się to od przedstawionych tutaj rozwiązań. Twoje sprawiedliwe rozwiązanie jest takie samo, jak przedstawione przez @sjlee
pulp_fiction

Nie jestem pewien, dlaczego musisz sprawdzić, czy zostało wykonane, gdy zgodnie z dokumentem Oracle, invokeAll zwróci tylko „gdy wszystko się skończy lub upłynie limit czasu, w zależności od tego, co nastąpi wcześniej”
Mashrur,

3

Wystarczy podać tutaj inne alternatywy niż używać zapadek / barier. Możesz również uzyskać częściowe wyniki, aż wszystkie zakończą się przy użyciu CompletionService .

Z Java Concurrency w praktyce: „Jeśli masz zestaw obliczeń do przesłania do Executora i chcesz odzyskać ich wyniki, gdy staną się dostępne, możesz zachować Przyszłość związaną z każdym zadaniem i wielokrotnie sondować w celu zakończenia, dzwoniąc do get z limit czasu zero. Jest to możliwe, ale uciążliwe . Na szczęście istnieje lepszy sposób : usługa kompletacji ”.

Tutaj wdrożenie

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}

3

To jest moje rozwiązanie oparte na wskazówce „AdamSkywalker” i działa

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}

2

Możesz użyć tego kodu:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");

2

Utworzyłem następujący działający przykład. Chodzi o to, aby mieć sposób na przetworzenie puli zadań (używam kolejki jako przykładu) z wieloma wątkami (określanymi programowo przez liczbęOfTasks / próg) i poczekaj, aż wszystkie wątki zostaną zakończone, aby kontynuować przetwarzanie.

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

Mam nadzieję, że to pomoże!


1

Możesz użyć własnej podklasy ExecutorCompletionService do zawinięcia taskExecutororaz własnej implementacji BlockingQueue, aby uzyskać informacje o zakończeniu każdego zadania i wykonać dowolne wywołanie zwrotne lub inną akcję, gdy liczba ukończonych zadań osiągnie pożądany cel.


1

powinieneś użyć executorService.shutdown()i executorService.awaitTerminationmetody.

Poniższy przykład:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}

jest wymagane Oczekiwanie na zakończenie () po shutdown () /
gaurav,

1

Więc zamieszczam tutaj swoją odpowiedź z połączonego pytania, jeśli ktoś chce prostszego sposobu na zrobienie tego

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.

0

Java 8 - Możemy używać interfejsu API strumienia do przetwarzania strumienia. Zobacz fragment kodu poniżej

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();

2
Cześć Vlad, witaj w StackOverflow. Czy możesz edytować swoją odpowiedź, aby wyjaśnić, w jaki sposób odpowiada ona na pytanie i co robi kod? Odradza się tylko odpowiedzi na kod. Dziękuję Ci!
Tim Malone

Ten post mówi o współbieżności. Równoległość! = Współbieżność
GabrielBB

0

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

Jeśli doSomething()rzucą inne wyjątki, latch.countDown()wydaje się, że się nie wykona, więc co powinienem zrobić?


0

jeśli użyjesz więcej wątku ExecutionServices SEQUENTIALLY i chcesz poczekać na zakończenie KAŻDEJ WYKONAWCZEJ USŁUGI. Najlepszy sposób jest jak poniżej;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}

-1

To może pomóc

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }

-1

Możesz wywołać waitTillDone () w tej klasie Runner :

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

Możesz ponownie użyć tej klasy i wywołać funkcję waitTillDone () tyle razy, ile chcesz przed wywołaniem shutdown (), a ponadto Twój kod jest wyjątkowo prosty . Również nie trzeba znać się szereg zadań góry.

Aby go użyć, po prostu dodaj tę compile 'com.github.matejtymes:javafixes:1.3.1'zależność grad / maven do swojego projektu.

Więcej informacji można znaleźć tutaj:

https://github.com/MatejTymes/JavaFixes


-2

W executorze istnieje metoda, getActiveCount()która podaje liczbę aktywnych wątków.

Po rozciągnięciu wątku możemy sprawdzić, czy activeCount()wartość wynosi 0. Gdy wartość wynosi zero, oznacza to, że nie ma obecnie aktywnych wątków, co oznacza, że ​​zadanie zostało zakończone:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}

3
Nie jest to dobry pomysł, patrz stackoverflow.com/a/7271685/1166992 i javadoc: „Zwraca przybliżoną liczbę wątków, które aktywnie wykonują zadania”.
Olivier Faucheux
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.