Nie można utworzyć buforowanej puli wątków z ograniczeniem rozmiaru?


127

Wydaje się, że niemożliwe jest utworzenie buforowanej puli wątków z ograniczeniem liczby wątków, które może ona utworzyć.

Oto jak static Executors.newCachedThreadPool jest zaimplementowane w standardowej bibliotece Java:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Tak więc, używając tego szablonu do utworzenia puli wątków w pamięci podręcznej o stałym rozmiarze:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Teraz, jeśli użyjesz tego i prześlesz 3 zadania, wszystko będzie dobrze. Przesłanie dalszych zadań spowoduje odrzucenie wyjątków wykonania.

Próbuję tego:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

Spowoduje, że wszystkie wątki będą wykonywane sekwencyjnie. Oznacza to, że pula wątków nigdy nie utworzy więcej niż jednego wątku do obsługi zadań.

To jest błąd w metodzie wykonywania programu ThreadPoolExecutor? A może jest to zamierzone? Czy jest inny sposób?

Edycja: chcę coś dokładnie takiego jak buforowana pula wątków (tworzy wątki na żądanie, a następnie je zabija po pewnym czasie), ale z ograniczeniem liczby wątków, które może utworzyć i możliwością kontynuowania kolejkowania dodatkowych zadań, gdy ma osiągnął limit wątków. Zgodnie z odpowiedzią sjlee jest to niemożliwe. Patrząc na metodę execute () ThreadPoolExecutor, jest to rzeczywiście niemożliwe. Musiałbym podklasować ThreadPoolExecutor i przesłonić execute (), podobnie jak robi to SwingWorker, ale to, co robi SwingWorker w swoim execute (), to kompletny hack.


1
Jakie jest Twoje pytanie? Czy Twój drugi przykład kodu nie jest odpowiedzią na Twój tytuł?
rsp

4
Chcę mieć pulę wątków, która będzie dodawać wątki na żądanie wraz ze wzrostem liczby zadań, ale nigdy nie doda więcej niż maksymalna liczba wątków. CachedThreadPool już to robi, z tym wyjątkiem, że doda nieograniczoną liczbę wątków i nie zatrzyma się na pewnym wstępnie zdefiniowanym rozmiarze. Rozmiar, który zdefiniowałem w przykładach, to 3. Drugi przykład dodaje 1 wątek, ale nie dodaje dwóch kolejnych, gdy pojawiają się nowe zadania, podczas gdy inne zadania nie zostały jeszcze ukończone.
Matt Crinklaw-Vogt

Sprawdź to, rozwiązuje to, debugowanieisfun.blogspot.com/2012/05/…
etan

Odpowiedzi:


235

ThreadPoolExecutor ma następujące kilka kluczowych zachowań, a Twoje problemy można wyjaśnić za pomocą tych zachowań.

Po przesłaniu zadań

  1. Jeśli pula wątków nie osiągnęła rozmiaru rdzenia, tworzy nowe wątki.
  2. Jeśli osiągnięto rozmiar rdzenia i nie ma bezczynnych wątków, kolejkuje zadania.
  3. Jeśli osiągnięto rozmiar rdzenia, nie ma bezczynnych wątków, a kolejka zapełni się, tworzy nowe wątki (aż osiągnie maksymalny rozmiar).
  4. Jeśli osiągnięto maksymalny rozmiar, nie ma żadnych bezczynnych wątków, a kolejka zapełni się, rozpoczyna się zasada odrzucania.

W pierwszym przykładzie zwróć uwagę, że SynchronousQueue ma zasadniczo rozmiar 0. W związku z tym w momencie osiągnięcia maksymalnego rozmiaru (3) zaczyna obowiązywać zasada odrzucania (# 4).

W drugim przykładzie wybraną kolejką jest LinkedBlockingQueue o nieograniczonym rozmiarze. Dlatego utkniesz z zachowaniem # 2.

Nie możesz naprawdę majstrować przy typie buforowanym lub typie stałym, ponieważ ich zachowanie jest prawie całkowicie określone.

Jeśli chcesz mieć ograniczoną i dynamiczną pulę wątków, musisz użyć dodatniego rozmiaru rdzenia i maksymalnego rozmiaru w połączeniu z kolejką o skończonym rozmiarze. Na przykład,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Dodatek : jest to dość stara odpowiedź i wydaje się, że JDK zmienił swoje zachowanie, jeśli chodzi o rozmiar rdzenia równy 0. Od JDK 1.6, jeśli rozmiar rdzenia wynosi 0, a pula nie zawiera żadnych wątków, ThreadPoolExecutor doda wątku, aby wykonać to zadanie. Dlatego rozmiar rdzenia równy 0 jest wyjątkiem od powyższej reguły. Dziękuję Steve za zwrócenie mi na to uwagi.


4
Musisz napisać kilka słów o metodzie, allowCoreThreadTimeOutaby ta odpowiedź była doskonała. Zobacz odpowiedź @ user1046052
hsestupin

1
Świetna odpowiedź! Jeszcze tylko jedna kwestia: warto wspomnieć o innych zasadach odrzucania. Zobacz odpowiedź @brianegge
Jeff

1
Zachowanie 2 nie powinno brzmieć „Jeśli maksymalny rozmiar wątku został osiągnięty i nie ma bezczynnych wątków, zadania są kolejkowane”. ?
Zoltán

1
Czy możesz wyjaśnić, co implikuje rozmiar kolejki? Czy to oznacza, że ​​tylko 20 zadań można ustawić w kolejce, zanim zostaną odrzucone?
Zoltán

1
@ Zoltán Napisałem to jakiś czas temu, więc istnieje szansa, że ​​od tego czasu niektóre zachowania mogły się zmienić (nie śledziłem zbyt dokładnie ostatnich działań), ale zakładając, że te zachowania pozostają niezmienione, # 2 jest poprawny, jak stwierdzono, i to jest być może najważniejszy (i nieco zaskakujący) punkt tego. Po osiągnięciu rozmiaru rdzenia TPE preferuje kolejkowanie zamiast tworzenia nowych wątków. Rozmiar kolejki to dosłownie rozmiar kolejki przekazanej do TPE. Jeśli kolejka zapełni się, ale nie osiągnęła maksymalnego rozmiaru, utworzy nowy wątek (nie odrzuci zadań). Zobacz nr 3. Mam nadzieję, że to pomoże.
sjlee

60

O ile czegoś nie przeoczyłem, rozwiązanie pierwotnego pytania jest proste. Poniższy kod implementuje żądane zachowanie zgodnie z opisem w oryginalnym plakacie. Pojawi się do 5 wątków, aby pracować w nieograniczonej kolejce, a bezczynne wątki zostaną zakończone po 60 sekundach.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

1
Masz rację. Ta metoda została dodana w jdk 1.6, więc niewiele osób o niej wie. również nie możesz mieć „minimalnej” wielkości puli podstawowej, co jest niefortunne.
jtahlborn

4
Moim jedynym zmartwieniem jest to (z dokumentacji JDK 8): „Kiedy nowe zadanie zostanie przesłane w metodzie execute (Runnable) i uruchomionych jest mniej niż corePoolSize wątków, tworzony jest nowy wątek do obsługi żądania, nawet jeśli inny pracownik wątki są bezczynne ”.
veegee

Jestem pewien, że to nie działa. Ostatnim razem, gdy patrzyłem, wykonując powyższe czynności, wykonujesz swoją pracę tylko w jednym wątku, mimo że spawnujesz 5. Znowu minęło kilka lat, ale kiedy zagłębiłem się w implementację ThreadPoolExecutor, wysłano go do nowych wątków dopiero po zapełnieniu kolejki. Korzystanie z nieograniczonej kolejki powoduje, że nigdy się to nie dzieje. Możesz przetestować, przesyłając pracę i logując nazwę wątku, a następnie śpiąc. Każdy runnable zakończy drukowanie tej samej nazwy / nie będzie uruchamiany w żadnym innym wątku.
Matt Crinklaw-Vogt

2
To działa, Matt. Ustawiasz rozmiar rdzenia na 0, dlatego miałeś tylko 1 wątek. Sztuczka polega na tym, aby ustawić rozmiar rdzenia na maksymalny rozmiar.
T-Gergely

1
@vegee ma rację - w rzeczywistości nie działa to zbyt dobrze - ThreadPoolExecutor będzie ponownie wykorzystywać wątki tylko wtedy, gdy jest powyżej corePoolSize. Więc kiedy corePoolSize jest równy maxPoolSize, skorzystasz z buforowania wątków tylko wtedy, gdy twoja pula jest pełna (więc jeśli zamierzasz tego używać, ale zwykle pozostajesz poniżej maksymalnego rozmiaru puli, równie dobrze możesz zmniejszyć limit czasu wątku do niskiego value; i pamiętaj, że nie ma buforowania - zawsze nowe wątki)
Chris Riddell

7

Miał ten sam problem. Ponieważ żadna inna odpowiedź nie łączy wszystkich problemów, dodaję moje:

Jest teraz wyraźnie napisane dokumentacji : jeśli używasz kolejki, która nie blokuje ( LinkedBlockingQueue) maksymalnej liczby wątków, ustawienie nie ma wpływu, używane są tylko podstawowe wątki.

więc:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Ten wykonawca ma:

  1. Brak koncepcji maksymalnej liczby wątków, ponieważ używamy nieograniczonej kolejki. Jest to dobra rzecz, ponieważ taka kolejka może spowodować, że moduł wykonawczy utworzy ogromną liczbę dodatkowych wątków niebędących rdzeniami, jeśli będzie postępować zgodnie ze swoją zwykłą polityką.

  2. Kolejka o maksymalnym rozmiarze Integer.MAX_VALUE. Submit()wyrzuci, RejectedExecutionExceptionjeśli liczba oczekujących zadań przekroczyInteger.MAX_VALUE . Nie jestem pewien, czy najpierw zabraknie nam pamięci, bo tak się stanie.

  3. Możliwe 4 wątki rdzeniowe. Wątki bezczynności są automatycznie zamykane, jeśli są bezczynne przez 5 sekund.Tak więc, tak, wątki ściśle na żądanie.Liczba może być zmieniana za pomocąsetThreads() metody.

  4. Zapewnia, że ​​minimalna liczba głównych wątków nigdy nie jest mniejsza niż jeden, w przeciwnym razie submit()odrzuci każde zadanie. Ponieważ wątki podstawowe muszą być> = max wątkami, metoda setThreads()ustawia również maksymalną liczbę wątków, chociaż ustawienie maksymalnego wątku jest bezużyteczne dla nieograniczonej kolejki.


Myślę, że musisz również ustawić parametr „allowCoreThreadTimeOut” na „true”, w przeciwnym razie po utworzeniu wątków będziesz je przechowywać na zawsze: gist.github.com/ericdcobb/46b817b384f5ca9d5f5d
eric

Ups, właśnie to przegapiłem, przepraszam, więc twoja odpowiedź jest idealna!
eric

6

W pierwszym przykładzie kolejne zadania są odrzucane, ponieważ AbortPolicyjest to ustawienie domyślne RejectedExecutionHandler. ThreadPoolExecutor zawiera następujące zasady, które można zmienić za pomocą setRejectedExecutionHandlermetody:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Wygląda na to, że chcesz buforować pulę wątków z CallerRunsPolicy.


5

Żadna z odpowiedzi tutaj nie rozwiązała mojego problemu, który dotyczył tworzenia ograniczonej liczby połączeń HTTP przy użyciu klienta HTTP Apache (wersja 3.x). Ponieważ wymyślenie dobrej konfiguracji zajęło mi kilka godzin, podzielę się:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Tworzy to, ThreadPoolExecutorktóry zaczyna się od pięciu i przechowuje maksymalnie dziesięć jednocześnie uruchomionych wątków używanych CallerRunsPolicydo wykonywania.


Problem z tym rozwiązaniem polega na tym, że jeśli zwiększysz liczbę lub producentów, zwiększysz liczbę wątków obsługujących wątki w tle. W wielu przypadkach tego nie chcesz.
Gray

3

Zgodnie z Javadoc dla ThreadPoolExecutor:

Jeśli jest więcej niż corePoolSize, ale mniej niż maximumPoolSize wątków, nowy wątek zostanie utworzony tylko wtedy, gdy kolejka jest pełna . Ustawiając corePoolSize i maximumPoolSize na takie same, tworzysz pulę wątków o stałym rozmiarze.

(Podkreśl moje.)

Odpowiedź jittera jest tym, czego chcesz, chociaż moja odpowiada na twoje inne pytanie. :)


2

jest jeszcze jedna opcja. Zamiast używać nowej kolejki SynchronousQueue, możesz również użyć dowolnej innej kolejki, ale musisz upewnić się, że jej rozmiar to 1, aby zmusić usługę executorservice do utworzenia nowego wątku.


Myślę, że masz na myśli rozmiar 0 (domyślnie), więc nie będzie zadań w kolejce i naprawdę zmusić usługę wykonawczą do tworzenia nowego wątku za każdym razem.
Leonmax,

2

Nie wygląda na to, żeby którakolwiek z odpowiedzi faktycznie odpowiadała na pytanie - w rzeczywistości nie widzę sposobu na zrobienie tego - nawet jeśli podklasujesz z PooledExecutorService, ponieważ wiele metod / właściwości jest prywatnych, np. Tworzenie addIfUnderMaximumPoolSize było chronione, możesz wykonaj następujące czynności:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

Najbliżej mi było to - ale nawet to nie jest zbyt dobrym rozwiązaniem

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

ps nie testował powyższego


2

Oto inne rozwiązanie. Myślę, że to rozwiązanie zachowuje się tak, jak chcesz (choć nie jest z niego dumne):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);

2

To jest to, czego chcesz (przynajmniej tak mi się wydaje). Aby uzyskać wyjaśnienie, sprawdź odpowiedź Jonathana Feinberga

Executors.newFixedThreadPool(int n)

Tworzy pulę wątków, która ponownie wykorzystuje stałą liczbę wątków działających poza udostępnioną kolejką nieograniczoną. W dowolnym momencie co najwyżej nThreads będzie aktywnych zadań przetwarzania. Jeśli dodatkowe zadania zostaną przesłane, gdy wszystkie wątki będą aktywne, będą czekać w kolejce, aż wątek będzie dostępny. Jeśli jakikolwiek wątek zakończy się z powodu awarii podczas wykonywania przed zamknięciem, nowy wątek zajmie jego miejsce, jeśli będzie to konieczne do wykonania kolejnych zadań. Wątki w puli będą istniały do ​​momentu jawnego zamknięcia.


4
Jasne, mógłbym użyć stałej puli wątków, ale pozostawiłoby to n wątków na zawsze lub do czasu wywołania shutdown. Chcę czegoś dokładnie takiego jak buforowana pula wątków (tworzy wątki na żądanie, a następnie zabija je po pewnym czasie), ale z ograniczeniem liczby wątków, które może utworzyć.
Matt Crinklaw-Vogt

0
  1. Możesz użyć ThreadPoolExecutorzgodnie z sugestią @sjlee

    Możesz dynamicznie kontrolować wielkość puli. Spójrz na to pytanie, aby uzyskać więcej informacji:

    Dynamiczna pula wątków

    LUB

  2. Możesz użyć interfejsu API newWorkStealingPool , który został wprowadzony w java 8.

    public static ExecutorService newWorkStealingPool()

    Tworzy pulę wątków do kradzieży pracy przy użyciu wszystkich dostępnych procesorów jako docelowego poziomu równoległości.

Domyślnie poziom równoległości jest ustawiony na liczbę rdzeni procesora na serwerze. Jeśli masz serwer z 4 rdzeniami procesora, rozmiar puli wątków wynosiłby 4. Ten interfejs API zwraca ForkJoinPooltyp ExecutorService i umożliwia kradzież pracy bezczynnych wątków poprzez kradzież zadań z zajętych wątków w ForkJoinPool.


0

Problem został podsumowany następująco:

Chcę czegoś dokładnie takiego jak buforowana pula wątków (tworzy wątki na żądanie, a następnie zabija je po pewnym czasie), ale z ograniczeniem liczby wątków, które może utworzyć i możliwością kontynuowania kolejkowania dodatkowych zadań po osiągnięciu limit wątku.

Zanim wskażę rozwiązanie, wyjaśnię, dlaczego poniższe rozwiązania nie działają:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

Nie spowoduje to kolejkowania zadań po osiągnięciu limitu 3, ponieważ SynchronousQueue z definicji nie może przechowywać żadnych elementów.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Nie spowoduje to utworzenia więcej niż jednego wątku, ponieważ ThreadPoolExecutor tworzy tylko wątki przekraczające corePoolSize, jeśli kolejka jest pełna. Ale LinkedBlockingQueue nigdy nie jest pełny.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

Nie spowoduje to ponownego użycia wątków, dopóki nie zostanie osiągnięty corePoolSize, ponieważ ThreadPoolExecutor zwiększa liczbę wątków do momentu osiągnięcia corePoolSize, nawet jeśli istniejące wątki są bezczynne. Jeśli możesz żyć z tą wadą, to jest to najłatwiejsze rozwiązanie problemu. Jest to również rozwiązanie opisane w artykule „Współbieżność języka Java w praktyce” (przypis na str. 172).

Wydaje się, że jedynym kompletnym rozwiązaniem opisanego problemu jest przesłonięcie metody kolejki offeri napisanie a, RejectedExecutionHandlerjak wyjaśniono w odpowiedziach na to pytanie: Jak sprawić, aby ThreadPoolExecutor zwiększył wątki do maksimum przed kolejkowaniem?


0

Działa to dla Java8 + (i innych, na razie ..)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()){{allowCoreThreadTimeOut(true);}};

gdzie 3 to limit liczby wątków, a 5 to limit czasu dla bezczynnych wątków.

Jeśli chcesz sprawdzić, czy to działa samodzielnie , oto kod do wykonania pracy:

public static void main(String[] args) throws InterruptedException {
    final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
    final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds

    System.out.println( java.lang.Thread.activeCount() + " threads");
    Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()) {{allowCoreThreadTimeOut(true);}};

    System.out.println(java.lang.Thread.activeCount() + " threads");

    for (int i = 0; i < 5; i++) {
        final int fi = i;
        executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
    }
    System.out.println("If this is UP, it works");

    while (true) {
        System.out.println(
                java.lang.Thread.activeCount() + " threads");
        Thread.sleep(700);
    }

}

static void waitsout(String pre, String post, int timeout) {
    try {
        System.out.println(pre);
        Thread.sleep(timeout);
        System.out.println(post);
    } catch (Exception e) {
    }
}

wyjście powyższego kodu dla mnie to

1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.