W rozwiązaniu podanym przez BalusC główny wątek pozostanie zablokowany na czas oczekiwania. Jeśli masz pulę wątków z więcej niż jednym wątkiem, potrzebujesz tej samej liczby dodatkowych wątków, które będą korzystać z Future.get (długi limit czasu, jednostka TimeUnit) blokującego wywołanie, aby czekać i zamykać wątek, jeśli przekroczy on limit czasu.
Ogólnym rozwiązaniem tego problemu jest utworzenie programu ThreadPoolExecutor Decorator, który może dodać funkcję limitu czasu. Ta klasa Dekoratora powinna tworzyć tyle wątków, ile ma ThreadPoolExecutor, a wszystkie te wątki powinny być używane tylko do oczekiwania i zamknięcia ThreadPoolExecutor.
Klasę ogólną należy zaimplementować jak poniżej:
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolDecorator extends ThreadPoolExecutor {
private final ThreadPoolExecutor commandThreadpool;
private final long timeout;
private final TimeUnit unit;
public TimeoutThreadPoolDecorator(ThreadPoolExecutor threadpool,
long timeout,
TimeUnit unit ){
super( threadpool.getCorePoolSize(),
threadpool.getMaximumPoolSize(),
threadpool.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS,
threadpool.getQueue());
this.commandThreadpool = threadpool;
this.timeout=timeout;
this.unit=unit;
}
@Override
public void execute(Runnable command) {
super.execute(() -> {
Future<?> future = commandThreadpool.submit(command);
try {
future.get(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException | TimeoutException e) {
throw new RejectedExecutionException(e);
} finally {
future.cancel(true);
}
});
}
@Override
public void setCorePoolSize(int corePoolSize) {
super.setCorePoolSize(corePoolSize);
commandThreadpool.setCorePoolSize(corePoolSize);
}
@Override
public void setThreadFactory(ThreadFactory threadFactory) {
super.setThreadFactory(threadFactory);
commandThreadpool.setThreadFactory(threadFactory);
}
@Override
public void setMaximumPoolSize(int maximumPoolSize) {
super.setMaximumPoolSize(maximumPoolSize);
commandThreadpool.setMaximumPoolSize(maximumPoolSize);
}
@Override
public void setKeepAliveTime(long time, TimeUnit unit) {
super.setKeepAliveTime(time, unit);
commandThreadpool.setKeepAliveTime(time, unit);
}
@Override
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
super.setRejectedExecutionHandler(handler);
commandThreadpool.setRejectedExecutionHandler(handler);
}
@Override
public List<Runnable> shutdownNow() {
List<Runnable> taskList = super.shutdownNow();
taskList.addAll(commandThreadpool.shutdownNow());
return taskList;
}
@Override
public void shutdown() {
super.shutdown();
commandThreadpool.shutdown();
}
}
Powyższy dekorator może być używany jak poniżej:
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args){
long timeout = 2000;
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 10, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(true));
threadPool = new TimeoutThreadPoolDecorator( threadPool ,
timeout,
TimeUnit.MILLISECONDS);
threadPool.execute(command(1000));
threadPool.execute(command(1500));
threadPool.execute(command(2100));
threadPool.execute(command(2001));
while(threadPool.getActiveCount()>0);
threadPool.shutdown();
}
private static Runnable command(int i) {
return () -> {
System.out.println("Running Thread:"+Thread.currentThread().getName());
System.out.println("Starting command with sleep:"+i);
try {
Thread.sleep(i);
} catch (InterruptedException e) {
System.out.println("Thread "+Thread.currentThread().getName()+" with sleep of "+i+" is Interrupted!!!");
return;
}
System.out.println("Completing Thread "+Thread.currentThread().getName()+" after sleep of "+i);
};
}
}