Jaka dokładnie jest różnica między wielkością puli podstawowej a maksymalnym rozmiarem puli, gdy mówimy o ThreadPoolExecutor
?
Czy można to wyjaśnić na przykładzie?
Jaka dokładnie jest różnica między wielkością puli podstawowej a maksymalnym rozmiarem puli, gdy mówimy o ThreadPoolExecutor
?
Czy można to wyjaśnić na przykładzie?
Odpowiedzi:
Weźmy ten przykład. Początkowy rozmiar puli wątków to 1, rozmiar puli podstawowej to 5, maksymalny rozmiar puli to 10, a kolejka to 100.
W miarę nadejścia żądań utworzonych zostanie do 5 wątków, a następnie zadania będą dodawane do kolejki, aż osiągnie 100. Gdy kolejka będzie pełna, zostaną utworzone nowe wątki do
maxPoolSize
. Gdy wszystkie wątki będą używane, a kolejka jest pełna, zadania zostaną odrzucone. Wraz ze zmniejszaniem się kolejki zmniejsza się liczba aktywnych wątków.
allowCoreThreadTimeOut(boolean)
która pozwala na zabijanie głównych wątków po określonym czasie bezczynności. Ustawienie wartości true i ustawienie core threads
= max threads
umożliwia skalowanie puli wątków od 0 do max threads
.
JEŚLI uruchomione wątki> corePoolSize & <maxPoolSize , a następnie utwórz nowy wątek, jeśli kolejka zadań Total jest pełna i nadchodzi nowy.
Form doc: (Jeśli jest więcej niż corePoolSize, ale mniej niż maximumPoolSize uruchomionych wątków, nowy wątek zostanie utworzony tylko wtedy, gdy kolejka jest pełna.)
Weźmy teraz prosty przykład:
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(5, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
Tutaj 5 to corePoolSize - oznacza, że Jvm utworzy nowy wątek dla nowego zadania dla pierwszych 5 zadań. a inne zadania będą dodawane do kolejki aż do zapełnienia kolejki (50 zadań).
10 to maxPoolSize - JVM może utworzyć maksymalnie 10 wątków. Oznacza, że jeśli jest już uruchomionych 5 zadań / wątków, a kolejka jest pełna z 50 oczekującymi zadaniami i jeśli w kolejce pojawi się jeszcze jedno nowe żądanie / zadanie, JVM utworzy nowy wątek do 10 (łącznie wątków = poprzednie 5 + nowe 5) ;
new ArrayBlockingQueue (50) = to całkowity rozmiar kolejki - może w niej umieścić 50 zadań.
gdy wszystkie 10 wątków są uruchomione i nadejdzie nowe zadanie, to nowe zadanie zostanie odrzucone.
Zasady tworzenia wątków wewnętrznie przez SUN:
Jeśli liczba wątków jest mniejsza niż corePoolSize, utwórz nowy wątek, aby uruchomić nowe zadanie.
Jeśli liczba wątków jest równa (lub większa) niż corePoolSize, umieść zadanie w kolejce.
Jeśli kolejka jest pełna, a liczba wątków jest mniejsza niż maxPoolSize, utwórz nowy wątek do uruchamiania zadań.
Jeśli kolejka jest pełna, a liczba wątków jest większa lub równa maxPoolSize, odrzuć zadanie.
Mam nadzieję, że to jest HelpFul ... i proszę mnie poprawić, jeśli się mylę ...
Z dokumentu :
Gdy nowe zadanie zostanie przesłane w metodzie execute (java.lang.Runnable) i uruchomionych jest mniej wątków niż corePoolSize, tworzony jest nowy wątek do obsługi żądania, nawet jeśli inne wątki robocze są bezczynne. Jeśli jest więcej uruchomionych wątków niż corePoolSize, ale mniej niż maximumPoolSize, nowy wątek zostanie utworzony tylko wtedy, gdy kolejka jest pełna.
Ponadto:
Ustawiając corePoolSize i maximumPoolSize na takie same, tworzysz pulę wątków o stałym rozmiarze. Ustawiając maximumPoolSize na zasadniczo nieograniczoną wartość, taką jak Integer.MAX_VALUE, umożliwiasz puli obsłużenie dowolnej liczby współbieżnych zadań. Najczęściej rozmiary rdzenia i maksymalne pule są ustawiane tylko podczas konstrukcji, ale można je również zmieniać dynamicznie za pomocą setCorePoolSize (int) i setMaximumPoolSize (int).
Jeśli zdecydujesz się utworzyć ThreadPoolExecutor
ręcznie zamiast używać Executors
klasy fabrycznej, będziesz musiał utworzyć i skonfigurować klasę za pomocą jednego z jej konstruktorów. Najbardziej rozbudowanym konstruktorem tej klasy jest:
public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler
);
Jak widać, możesz skonfigurować:
Ograniczenie liczby wykonywanych współbieżnych zadań, zmiana rozmiaru puli wątków, stanowi ogromną korzyść dla aplikacji i jej środowiska wykonawczego pod względem przewidywalności i stabilności: nieograniczone tworzenie wątków ostatecznie wyczerpuje zasoby środowiska wykonawczego, a w konsekwencji może wystąpić sytuacja dla aplikacji , poważne problemy z wydajnością, które mogą prowadzić nawet do niestabilności aplikacji.
To rozwiązanie tylko jednej części problemu: ograniczasz liczbę wykonywanych zadań, ale nie ograniczasz liczby zadań, które można przesłać i umieścić w kolejce do późniejszego wykonania. Aplikacja doświadczy później niedoboru zasobów, ale w końcu to odczuje, jeśli wskaźnik zgłoszeń będzie stale przekraczał wskaźnik wykonania.
Rozwiązanie tego problemu jest następujące: Zapewnienie kolejki blokującej do executora w celu wstrzymania oczekujących zadań. W przypadku zapełnienia się kolejki przesłane zadanie zostanie „odrzucone”. RejectedExecutionHandler
Jest wywoływana gdy złożenie zadanie zostanie odrzucony, i dlatego został odrzucony czasownik cytowane w poprzednim punkcie. Możesz wdrożyć własną politykę odrzucania lub użyć jednej z wbudowanych zasad udostępnionych przez platformę.
Domyślne zasady odrzucania powodują, że moduł wykonawczy zgłasza plik RejectedExecutionException
. Jednak inne wbudowane zasady pozwalają:
Reguły rozmiaru puli ThreadPoolExecutor
Zasady dotyczące rozmiaru ThreadPoolExecutor's
puli są generalnie niezrozumiałe, ponieważ nie działają one tak, jak myślisz, że powinny lub w sposób, w jaki chcesz.
Weźmy ten przykład. Początkowy rozmiar puli wątków to 1, rozmiar puli podstawowej to 5, maksymalny rozmiar puli to 10, a kolejka to 100.
Na sposób firmy Sun: gdy żądania przychodzą w wątkach, zostanie utworzonych do 5, następnie zadania będą dodawane do kolejki, aż osiągnie 100. Gdy kolejka będzie pełna, zostaną utworzone nowe wątki do maxPoolSize
. Gdy wszystkie wątki będą używane, a kolejka jest pełna, zadania zostaną odrzucone. Wraz ze zmniejszaniem się kolejki zmniejsza się liczba aktywnych wątków.
Sposób przewidywany przez użytkownika: gdy żądania przychodzą w wątkach, zostanie utworzonych do 10, następnie zadania będą dodawane do kolejki, aż osiągnie 100, po czym zostaną odrzucone. Liczba wątków zmieni się na maksymalną, aż kolejka będzie pusta. Kiedy kolejka jest pusta, wątki będą ginąć, dopóki nie zostaną corePoolSize
.
Różnica polega na tym, że użytkownicy chcą zacząć zwiększać rozmiar puli wcześniej i chcą, aby kolejka była mniejsza, podczas gdy metoda Sun chce utrzymać mały rozmiar puli i zwiększyć go tylko wtedy, gdy obciążenie stanie się zbyt duże.
Oto proste zasady Sun dotyczące tworzenia wątków:
corePoolSize
, utwórz nowy wątek, aby uruchomić nowe zadanie.corePoolSize
, umieść zadanie w kolejce.maxPoolSize
, utwórz nowy wątek do uruchamiania zadań.maxPoolSize
, odrzuć zadanie. Krótko mówiąc, nowe wątki są tworzone tylko wtedy, gdy kolejka się zapełnia, więc jeśli używasz kolejki nieograniczonej, liczba wątków nie przekroczy corePoolSize
.Aby uzyskać pełniejsze wyjaśnienie, pobierz to z paszczy konia: ThreadPoolExecutor
dokumentacja API.
Jest naprawdę dobry post na forum, który przedstawia sposób, w jaki ThreadPoolExecutor
działa z przykładami kodu: http://forums.sun.com/thread.jspa?threadID=5401400&tstart=0
Więcej informacji: http://forums.sun.com/thread.jspa?threadID=5224557&tstart=450
Definicję terminów corepoolsize i maxpoolsize można znaleźć w pliku javadoc. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html
Powyższy link zawiera odpowiedź na Twoje pytanie. Jednak tylko po to, aby było jasne. Aplikacja będzie kontynuować tworzenie wątków, aż osiągnie corePoolSize. Myślę, że chodzi tutaj o to, aby tych wiele wątków wystarczyło do obsługi napływu zadań. Jeśli nowe zadanie pojawi się po utworzeniu wątków corePoolSize, zadania zostaną umieszczone w kolejce. Gdy kolejka będzie pełna, executor rozpocznie tworzenie nowych wątków. To rodzaj równoważenia. Zasadniczo oznacza to, że napływ zadań jest czymś więcej niż możliwościami przetwarzania. Zatem Executor zacznie ponownie tworzyć nowe wątki, aż osiągnie maksymalną liczbę wątków. Ponownie, nowe wątki zostaną utworzone wtedy i tylko wtedy, gdy kolejka jest pełna.
Dobre wyjaśnienie na tym blogu:
public class ThreadPoolExecutorExample {
public static void main (String[] args) {
createAndRunPoolForQueue(new ArrayBlockingQueue<Runnable>(3), "Bounded");
createAndRunPoolForQueue(new LinkedBlockingDeque<>(), "Unbounded");
createAndRunPoolForQueue(new SynchronousQueue<Runnable>(), "Direct hand-off");
}
private static void createAndRunPoolForQueue (BlockingQueue<Runnable> queue,
String msg) {
System.out.println("---- " + msg + " queue instance = " +
queue.getClass()+ " -------------");
ThreadPoolExecutor e = new ThreadPoolExecutor(2, 5, Long.MAX_VALUE,
TimeUnit.NANOSECONDS, queue);
for (int i = 0; i < 10; i++) {
try {
e.execute(new Task());
} catch (RejectedExecutionException ex) {
System.out.println("Task rejected = " + (i + 1));
}
printStatus(i + 1, e);
}
e.shutdownNow();
System.out.println("--------------------\n");
}
private static void printStatus (int taskSubmitted, ThreadPoolExecutor e) {
StringBuilder s = new StringBuilder();
s.append("poolSize = ")
.append(e.getPoolSize())
.append(", corePoolSize = ")
.append(e.getCorePoolSize())
.append(", queueSize = ")
.append(e.getQueue()
.size())
.append(", queueRemainingCapacity = ")
.append(e.getQueue()
.remainingCapacity())
.append(", maximumPoolSize = ")
.append(e.getMaximumPoolSize())
.append(", totalTasksSubmitted = ")
.append(taskSubmitted);
System.out.println(s.toString());
}
private static class Task implements Runnable {
@Override
public void run () {
while (true) {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
break;
}
}
}
}
}
Wynik :
---- Bounded queue instance = class java.util.concurrent.ArrayBlockingQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueCapacity = 1, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 3, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 4, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
---- Unbounded queue instance = class java.util.concurrent.LinkedBlockingDeque -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2147483646, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueRemainingCapacity = 2147483645, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 2147483644, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 2, corePoolSize = 2, queueSize = 4, queueRemainingCapacity = 2147483643, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 2, corePoolSize = 2, queueSize = 5, queueRemainingCapacity = 2147483642, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 2, corePoolSize = 2, queueSize = 6, queueRemainingCapacity = 2147483641, maximumPoolSize = 5, totalTasksSubmitted = 8
poolSize = 2, corePoolSize = 2, queueSize = 7, queueRemainingCapacity = 2147483640, maximumPoolSize = 5, totalTasksSubmitted = 9
poolSize = 2, corePoolSize = 2, queueSize = 8, queueRemainingCapacity = 2147483639, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
---- Direct hand-off queue instance = class java.util.concurrent.SynchronousQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 3, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 4, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
Task rejected = 6
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
Task rejected = 7
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
Task rejected = 8
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
Process finished with exit code 0
Z książki Java concurency essentials :
CorePoolSize : ThreadPoolExecutor ma atrybut corePoolSize, który określa, ile wątków rozpocznie, dopóki nowe wątki nie zostaną uruchomione, gdy kolejka jest pełna
MaximumPoolSize : Ten atrybut określa, jak wiele wątków są uruchomione na maksimum. Możesz ustawić to na Integer. MAX_VALUE, aby nie mieć górnej granicy
java.util.concurrent.ThreadPoolExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
Zrozumienie wewnętrznego zachowania ThreadPoolExecutor
po przesłaniu nowego zadania pomogło mi zrozumieć, corePoolSize
czym się maximumPoolSize
różnią.
Pozwolić:
N
być liczba wątków w basenie getPoolSize()
. Aktywne wątki + nieaktywne wątki.T
być liczbą zadań przekazanych wykonawcy / puli.C
być podstawą wielkości basen, getCorePoolSize()
. Ile maksymalnie wątków można utworzyć na pulę dla zadań przychodzących, zanim nowe zadania trafią do kolejki .M
być maksymalna wielkość basenu, getMaximumPoolSize()
. Maksymalna liczba wątków, które może przydzielić pula.Zachowania ThreadPoolExecutor
w Javie po przesłaniu nowego zadania:
N <= C
przypadku bezczynnych wątków nie jest przypisywane nowe zadanie przychodzące, zamiast tego tworzony jest nowy wątek.N > C
i jeśli są bezczynne wątki, to jest tam przypisywane nowe zadanie.N > C
i jeśli NIE ma bezczynnych wątków, nowe zadania są umieszczane w kolejce. NIE STWORZONO TUTAJ NOWYCH NITKÓW.M
. Jeśli M
zostanie osiągnięty, odrzucamy zadania. Ważne, aby tego nie było, to fakt, że nie tworzymy nowych wątków, dopóki kolejka nie będzie pełna!Źródła:
corePoolSize = 0
i maximumPoolSize = 10
z pojemnością kolejki 50
.Spowoduje to powstanie jednego aktywnego wątku w puli, dopóki kolejka nie będzie zawierała 50 elementów.
executor.execute(task #1):
before task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
after task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
[task #1 immediately queued and kicked in b/c the very first thread is created when `workerCountOf(recheck) == 0`]
execute(task #2):
before task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
after task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
[task #2 not starting before #1 is done]
... executed a few tasks...
execute(task #19)
before task #19 submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 17, completed tasks = 0]
after task #19 submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 18, completed tasks = 0]
...
execute(task #51)
before task submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 50, completed tasks = 0]
after task submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 2, active threads = 2, queued tasks = 50, completed tasks = 0]
Queue is full.
A new thread was created as the queue was full.
corePoolSize = 10
i maximumPoolSize = 10
z pojemnością kolejki 50
.Spowoduje to utworzenie 10 aktywnych wątków w puli. Gdy kolejka zawiera 50 pozycji, zadania zostaną odrzucone.
execute(task #1)
before task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
after task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
execute(task #2)
before task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
after task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
execute(task #3)
before task #3 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
after task #3 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
... executed a few tasks...
execute(task #11)
before task #11 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 0]
after task #11 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 1, completed tasks = 0]
... executed a few tasks...
execute(task #51)
before task #51 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 50, completed tasks = 0]
Task was rejected as we have reached `maximumPoolSize`.