Następująca klasa otacza ThreadPoolExecutor i używa Semaphore do blokowania, a następnie kolejka robocza jest pełna:
public final class BlockingExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
this.semaphore = new Semaphore(queueSize + maxPoolSize);
}
private void execImpl (final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
// will never be thrown with an unbounded buffer (LinkedBlockingQueue)
semaphore.release();
throw e;
}
}
public void execute (Runnable command) throws InterruptedException {
execImpl(command);
}
}
Ta klasa opakowania jest oparta na rozwiązaniu przedstawionym w książce Java Concurrency in Practice autorstwa Briana Goetza. Rozwiązanie opisane w książce przyjmuje tylko dwa parametry konstruktora: an Executor
i ograniczenie używane dla semafora. Pokazuje to odpowiedź udzielona przez Fixpoint. Jest problem z tym podejściem: może dojść do stanu, w którym wątki puli są zajęte, kolejka jest pełna, ale semafor właśnie wydał zezwolenie. ( semaphore.release()
w ostatnim bloku). W tym stanie nowe zadanie może pobrać właśnie wydane zezwolenie, ale jest odrzucane, ponieważ kolejka zadań jest pełna. Oczywiście nie jest to coś, czego chcesz; chcesz zablokować w tym przypadku.
Aby rozwiązać ten problem, musimy użyć nieograniczonej kolejki, o czym wyraźnie wspomina JCiP. Semafor działa jako strażnik, dając efekt wirtualnego rozmiaru kolejki. Ma to taki efekt uboczny, że jest możliwe, że jednostka może zawierać maxPoolSize + virtualQueueSize + maxPoolSize
zadania. Dlaczego? Z powodu
semaphore.release()
w ostatnim bloku. Jeśli wszystkie wątki puli wywołują tę instrukcję w tym samym czasie, maxPoolSize
zezwolenia są zwalniane, pozwalając tej samej liczbie zadań na wejście do jednostki. Gdybyśmy używali ograniczonej kolejki, nadal byłaby pełna, co spowodowałoby odrzucenie zadania. Teraz, ponieważ wiemy, że dzieje się tak tylko wtedy, gdy wątek puli jest prawie gotowy, nie stanowi to problemu. Wiemy, że wątek puli nie będzie się blokował, więc zadanie wkrótce zostanie pobrane z kolejki.
Możesz jednak użyć ograniczonej kolejki. Tylko upewnij się, że jego rozmiar jest równy virtualQueueSize + maxPoolSize
. Większe rozmiary są bezużyteczne, semafor uniemożliwi wpuszczenie większej liczby elementów. Mniejsze rozmiary spowodują odrzucenie zadań. Szansa na odrzucenie zadań rośnie wraz ze zmniejszaniem się rozmiaru. Na przykład, powiedzmy, że chcesz mieć ograniczony moduł wykonawczy z maxPoolSize = 2 i virtualQueueSize = 5. Następnie weź semafor z 5 + 2 = 7 zezwoleniami i rzeczywistym rozmiarem kolejki 5 + 2 = 7. Rzeczywista liczba zadań, które mogą znajdować się w jednostce, to 2 + 5 + 2 = 9. Kiedy moduł wykonawczy jest pełny (5 zadań w kolejce, 2 w puli wątków, więc 0 zezwoleń jest dostępnych) i WSZYSTKIE wątki puli zwalniają swoje zezwolenia, wtedy nadchodzące zadania mogą otrzymać dokładnie 2 zezwolenia.
Teraz rozwiązanie JCiP jest nieco kłopotliwe w użyciu, ponieważ nie wymusza wszystkich tych ograniczeń (nieograniczonej kolejki lub ograniczeń matematycznych itp.). Myślę, że jest to tylko dobry przykład pokazujący, jak można budować nowe klasy bezpiecznych wątków w oparciu o części, które są już dostępne, ale nie jako w pełni rozwiniętą klasę wielokrotnego użytku. Nie sądzę, żeby to drugie było intencją autora.