Występuje problem ThreadPoolExecutor
polegający na RejectedExecutionException
tym, że jeśli próbuję zmienić rozmiar puli podstawowej na inną liczbę po utworzeniu puli, to sporadycznie niektóre zadania są odrzucane, chociaż nigdy nie przesyłam więcej niż queueSize + maxPoolSize
liczby zadań.
Problem, który próbuję rozwiązać, to rozszerzenie, ThreadPoolExecutor
które zmienia rozmiar swoich głównych wątków w oparciu o oczekujące wykonania znajdujące się w kolejce puli wątków. Potrzebuję tego, ponieważ domyślnie a ThreadPoolExecutor
utworzy nowy Thread
tylko wtedy, gdy kolejka jest pełna.
Oto mały samodzielny program Pure Java 8, który demonstruje problem.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
Dlaczego pula powinna kiedykolwiek zgłaszać wyjątek RejectedExecutionException, jeśli nigdy nie przesyłam więcej niż kolejkaCapacity + maxThreads. Nigdy nie zmieniam maksymalnej liczby wątków, więc zgodnie z definicją ThreadPoolExecutor powinien on albo dostosować zadanie do wątku, albo do kolejki.
Oczywiście, jeśli nigdy nie zmieniam wielkości puli, to pula wątków nigdy nie odrzuca żadnych zgłoszeń. Jest to również trudne do debugowania, ponieważ dodanie jakichkolwiek opóźnień w zgłoszeniach sprawia, że problem znika.
Wszelkie wskazówki, jak naprawić RejectedExecutionException?
ThreadPoolExecutor
jest prawdopodobnie złym pomysłem i czy nie musisz zmieniać istniejącego kodu również w tym przypadku? Najlepiej byłoby podać przykład, w jaki sposób Twój kod uzyskuje dostęp do modułu wykonującego. Byłbym zaskoczony, gdyby używał wielu metod specyficznych dla ThreadPoolExecutor
(tj. Nie w ExecutorService
).
ExecutorService
, pakując istniejącą, która ponownie przesyła zadania, których przesłanie nie powiodło się z powodu zmiany rozmiaru?