Wykonywacze Java: jak otrzymywać powiadomienia, bez blokowania, o zakończeniu zadania?


154

Powiedzmy, że mam kolejkę pełną zadań, które muszę przesłać do usługi wykonawczej. Chcę, aby były przetwarzane pojedynczo. Najprościej przychodzi mi do głowy:

  1. Weź zadanie z kolejki
  2. Prześlij go do wykonawcy
  3. Wywołaj .get na zwróconym Future i blokuj, aż wynik będzie dostępny
  4. Weź kolejne zadanie z kolejki ...

Jednak staram się całkowicie uniknąć blokowania. Jeśli mam 10000 takich kolejek, które wymagają przetwarzania ich zadań pojedynczo, zabraknie mi miejsca na stosie, ponieważ większość z nich będzie trzymać się zablokowanych wątków.

Chciałbym przesłać zadanie i zapewnić oddzwonienie, które zostanie wywołane po zakończeniu zadania. Użyję tego powiadomienia zwrotnego jako flagi do wysłania następnego zadania. (Functionjava i jetlang najwyraźniej używają takich nieblokujących algorytmów, ale nie mogę zrozumieć ich kodu)

Jak mogę to zrobić za pomocą java.util.concurrent JDK, poza napisaniem własnej usługi wykonawczej?

(kolejka, która dostarcza mi te zadania, może sama się blokować, ale jest to problem, który należy rozwiązać później)

Odpowiedzi:


146

Zdefiniuj interfejs wywołania zwrotnego, aby otrzymywać wszelkie parametry, które chcesz przekazać w powiadomieniu o zakończeniu. Następnie wywołaj go na końcu zadania.

Możesz nawet napisać ogólne opakowanie dla zadań Runnable i przesłać je do ExecutorService. Lub zobacz poniżej mechanizm wbudowany w Javę 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

Z CompletableFutureJava 8 zawarte dokładniejszym środki komponować rurociągów gdzie procesy mogą być zrealizowanych asynchronicznie i warunkowo. Oto wymyślony, ale kompletny przykład powiadomienia.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

1
Trzy odpowiedzi w mgnieniu oka! Podoba mi się CallbackTask, takie proste i proste rozwiązanie. Z perspektywy czasu wydaje się to oczywiste. Dzięki. Odnośnie innych komentarzy na temat SingleThreadedExecutor: mogę mieć tysiące kolejek, które mogą mieć tysiące zadań. Każdy z nich musi przetwarzać swoje zadania pojedynczo, ale różne kolejki mogą działać równolegle. Dlatego używam jednej globalnej puli wątków. Jestem nowy w wykonawcach, więc proszę, powiedz mi, jeśli się mylę.
Shahbaz

5
Dobry wzorzec, jednak użyłbym przyszłego, możliwego do słuchania API Guavy, które zapewnia bardzo dobrą implementację.
Pierre-Henri

czy to nie jest sprzeczne z celem korzystania z Future?
takecare

2
@Zelphir To był Callbackinterfejs, który deklarujesz; nie z biblioteki. Obecnie I pewnie po prostu użyć Runnable, Consumerlub BiConsumer, w zależności od tego, co muszę przejść z powrotem od zadania do słuchacza.
erickson

1
@Bhargav Jest to typowe dla wywołań zwrotnych - jednostka zewnętrzna „oddzwania” do jednostki kontrolującej. Czy chcesz, aby wątek, który utworzył zadanie, był blokowany do czasu zakończenia zadania? Jaki jest zatem cel uruchamiania zadania w drugim wątku? Jeśli pozwolisz wątkowi kontynuować, będzie musiał wielokrotnie sprawdzać stan współdzielenia (prawdopodobnie w pętli, ale zależy to od twojego programu), dopóki nie zauważy aktualizacji (flaga boolowska, nowy element w kolejce itp.) Dokonanej przez true oddzwonienie zgodnie z opisem w tej odpowiedzi. Może wtedy wykonać dodatkową pracę.
erickson

52

W Javie 8 możesz użyć CompletableFuture . Oto przykład, który miałem w moim kodzie, w którym używam go do pobierania użytkowników z mojej usługi użytkownika, mapowania ich na obiekty widoku, a następnie aktualizowania widoku lub wyświetlania okna dialogowego błędu (jest to aplikacja GUI):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

Wykonuje się asynchronicznie. Używam dwóch prywatnych metod: mapUsersToUserViewsi updateView.


Jak można używać CompletableFuture z executorem? (aby ograniczyć liczbę współbieżnych / równoległych instancji) Czy byłaby to wskazówka: cf: przesyłanie-futuretasks-to-an-executor-Why-does-it-work ?
user1767316

47

Użyj przyszłego API, które można nasłuchiwać, i dodaj wywołanie zwrotne. Por. ze strony internetowej:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Cześć, ale jeśli chcę zatrzymać po tym wątku onSuccess, jak mogę to zrobić?
DevOps85,

24

Możesz rozszerzyć FutureTaskklasę i przesłonić done()metodę, a następnie dodać FutureTaskobiekt do klasy ExecutorService, dzięki czemu done()metoda zostanie wywołana FutureTasknatychmiast po zakończeniu.


then add the FutureTask object to the ExecutorService, czy mógłbyś mi powiedzieć, jak to zrobić?
Gary Gauh

@GaryGauh zobacz to, aby uzyskać więcej informacji, które możesz rozszerzyć FutureTask, możemy nazwać to MyFutureTask. Następnie użyj ExcutorService, aby przesłać MyFutureTask, a następnie uruchomiona zostanie metoda MyFutureTask, a po zakończeniu MyFutureTask zostanie wywołana gotowa metoda.Tutaj coś mylącego to dwa FutureTask, aw rzeczywistości MyFutureTask jest normalnym Runnable.
Chaojun Zhong

15

ThreadPoolExecutorma również beforeExecutei afterExecutemetody przechwytywania, które można przesłonić i wykorzystać. Oto opis od ThreadPoolExecutor„s Javadocs .

Metody hakowe

Ta klasa udostępnia chronione, które można zastąpić beforeExecute(java.lang.Thread, java.lang.Runnable)i afterExecute(java.lang.Runnable, java.lang.Throwable)metody, które są wywoływane przed i po wykonaniu każdego zadania. Można ich używać do manipulowania środowiskiem wykonawczym; na przykład ponowna inicjalizacja ThreadLocals, zbieranie statystyk lub dodawanie wpisów do dziennika. Ponadto metodę terminated()można nadpisać, aby wykonać specjalne przetwarzanie, które należy wykonać po Executorcałkowitym zakończeniu działania. Jeśli metody przechwytujące lub wywołania zwrotne zgłaszają wyjątki, wewnętrzne wątki robocze mogą z kolei zakończyć się niepowodzeniem i nagle zakończyć.


6

Użyj CountDownLatch.

To jest java.util.concurrentdokładnie sposób, w jaki należy czekać na zakończenie wykonywania kilku wątków przed kontynuowaniem.

Aby uzyskać efekt oddzwaniania, którym się opiekujesz, wymaga to trochę dodatkowej pracy. Mianowicie załatwienie tego samodzielnie w osobnym wątku, który używa CountDownLatchi czeka na to, a następnie powiadamia o tym, co chcesz powiadomić. Nie ma natywnej obsługi wywołań zwrotnych ani niczego podobnego do tego efektu.


EDYCJA: teraz, kiedy lepiej rozumiem twoje pytanie, myślę, że posuwasz się za daleko, niepotrzebnie. Jeśli bierzesz zwykły SingleThreadExecutor, daj mu wszystkie zadania, a kolejkowanie wykona natywnie.


Jaki jest najlepszy sposób na sprawdzenie, czy wszystkie wątki zostały zakończone za pomocą SingleThreadExecutor? Widziałem przykład, który zajmuje trochę czasu! Executor.isTerminated, ale nie wydaje się to zbyt eleganckie. Zaimplementowałem funkcję oddzwaniania dla każdego pracownika i zwiększam liczbę, która działa.
Bear

5

Jeśli chcesz mieć pewność, że żadne zadania nie będą uruchamiane w tym samym czasie, użyj SingleThreadedExecutor . Zadania będą realizowane w kolejności, w jakiej zostały przesłane. Nie musisz nawet wstrzymywać zadań, po prostu prześlij je do exec.


2

Prosty kod do implementacji Callbackmechanizmu przy użyciuExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

wynik:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Kluczowe uwagi:

  1. Jeśli chcesz czynności procesowych w sekwencji w porządku FIFO, wymienić newFixedThreadPool(5) znewFixedThreadPool(1)
  2. Jeśli chcesz przetworzyć następne zadanie po przeanalizowaniu wyniku callbackpoprzedniego zadania, po prostu odznacz poniżej linię

    //System.out.println("future status:"+future.get()+":"+future.isDone());
  3. Możesz zastąpić newFixedThreadPool()jednym z

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor

    w zależności od przypadku użycia.

  4. Jeśli chcesz asynchronicznie obsługiwać metodę wywołania zwrotnego

    za. Przekaż ExecutorService or ThreadPoolExecutorzadanie udostępnione do wywoływanego

    b. Zamień swoją Callablemetodę na Callable/Runnablezadanie

    do. Przekaż zadanie zwrotne do ExecutorService or ThreadPoolExecutor


1

Aby dodać do odpowiedzi Matta, która pomogła, oto bardziej szczegółowy przykład pokazujący użycie wywołania zwrotnego.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

Wynik to:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

1

Możesz użyć implementacji wywoływanej, takiej że

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

gdzie CallbackInterface jest czymś bardzo podstawowym, jak

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

a teraz główna klasa będzie wyglądać tak

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);

1

To jest rozszerzenie odpowiedzi Pache'a używającej guawy ListenableFuture.

W szczególności Futures.transform()zwraca ListenableFuturetak, może być używany do łączenia wywołań asynchronicznych. Futures.addCallback()zwraca void, więc nie może być używany do łączenia w łańcuch, ale jest dobry do obsługi sukcesu / niepowodzenia w przypadku zakończenia asynchronicznego.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

UWAGA: Oprócz łączenia zadań asynchronicznych w łańcuch Futures.transform()umożliwia również zaplanowanie każdego zadania w osobnym module wykonawczym (nie pokazano w tym przykładzie).


Wydaje się to całkiem miłe.
kaiser
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.