Jak mogę obejść to ograniczenie w sytuacji, ThreadPoolExecutor
gdy kolejka musi być ograniczona i zapełniona, zanim zostanie uruchomionych więcej wątków.
Wydaje mi się, że w końcu znalazłem dość eleganckie (może trochę hakerskie) rozwiązanie tego ograniczenia za pomocą ThreadPoolExecutor
. Polega ona na rozszerzenie LinkedBlockingQueue
mieć to powrót false
do queue.offer(...)
gdy istnieją już pewne zadania w kolejce. Jeśli bieżące wątki nie nadążają za zadaniami w kolejce, TPE doda dodatkowe wątki. Jeśli pula ma już maksymalną liczbę wątków, RejectedExecutionHandler
zostanie wywołana. To program obsługujący wprowadza następnie put(...)
do kolejki.
Z pewnością dziwne jest pisanie kolejki, do której offer(...)
można wrócić false
i put()
nigdy nie blokować, więc to jest część hackowania. Ale działa to dobrze w przypadku korzystania z kolejki przez TPE, więc nie widzę żadnego problemu z robieniem tego.
Oto kod:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
Dzięki temu mechanizmowi, kiedy przesyłam zadania do kolejki, ThreadPoolExecutor
wola:
- Początkowo skaluj liczbę wątków do rozmiaru rdzenia (tutaj 1).
- Zaproponuj to w kolejce. Jeśli kolejka jest pusta, zostanie umieszczona w kolejce do obsługi przez istniejące wątki.
- Jeśli kolejka ma już 1 lub więcej elementów,
offer(...)
zwróci wartość false.
- Jeśli zwracana jest wartość false, zwiększaj liczbę wątków w puli, aż osiągną maksymalną liczbę (tutaj 50).
- Jeśli na maksimum, wywołuje
RejectedExecutionHandler
- W
RejectedExecutionHandler
odkłada zadanie w kolejce do przetworzenia przez pierwszego dostępnego wątku FIFO kolejności.
Chociaż w moim przykładowym kodzie powyżej kolejka jest nieograniczona, możesz również zdefiniować ją jako ograniczoną kolejkę. Na przykład, jeśli dodasz pojemność 1000 do pojemności, LinkedBlockingQueue
to:
- skalowanie gwintów do max
- następnie ustaw w kolejce, aż wypełni się 1000 zadań
- następnie blokuj dzwoniącego, aż kolejka będzie miała wolne miejsce.
Ponadto, jeśli potrzebujesz użyć offer(...)
w,
RejectedExecutionHandler
możesz użyć offer(E, long, TimeUnit)
metody zamiast Long.MAX_VALUE
limitu czasu.
Ostrzeżenie:
Jeśli spodziewasz się, że zadania zostaną dodane do modułu wykonawczego po jego zamknięciu, możesz mądrzej wyrzucić RejectedExecutionException
nasze niestandardowe ustawienia, RejectedExecutionHandler
gdy usługa wykonawcy została zamknięta. Dzięki @RaduToader za wskazanie tego.
Edytować:
Inną poprawką tej odpowiedzi może być zapytanie TPE, czy istnieją bezczynne wątki i umieszczenie elementu w kolejce tylko wtedy, gdy tak jest. Musiałbyś stworzyć dla tego prawdziwą klasę i dodać do niej ourQueue.setThreadPoolExecutor(tpe);
metodę.
Wtedy Twoja offer(...)
metoda może wyglądać następująco:
- Sprawdź, czy
tpe.getPoolSize() == tpe.getMaximumPoolSize()
w takim przypadku po prostu zadzwoń super.offer(...)
.
- W przeciwnym razie
tpe.getPoolSize() > tpe.getActiveCount()
zadzwoń, super.offer(...)
ponieważ wydają się być nieaktywne wątki.
- W przeciwnym razie wróć
false
do rozwidlenia innego wątku.
Może to:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
Zauważ, że metody get w TPE są drogie, ponieważ uzyskują dostęp do volatile
pól lub (w przypadku getActiveCount()
) blokują TPE i przechodzą przez listę wątków. Istnieją również warunki wyścigu, które mogą spowodować nieprawidłowe umieszczenie zadania w kolejce lub rozwidlenie innego wątku, gdy był wątek bezczynny.