Czy istnieje usługa ExecutorService, która używa bieżącego wątku?


94

Poszukuję zgodnego sposobu konfigurowania korzystania z puli wątków lub nie. W idealnym przypadku nie powinno to mieć żadnego wpływu na resztę kodu. Mógłbym użyć puli wątków z 1 wątkiem, ale nie jest to do końca to, czego chcę. Jakieś pomysły?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);

// es.execute / es.submit / new ExecutorCompletionService(es) etc

Odpowiedzi:


70

Oto naprawdę prosta Executor(nie ExecutorService, pamiętajcie) implementacja, która używa tylko bieżącego wątku. Kradzież z „Java Concurrency in Practice” (lektura niezbędna).

public class CurrentThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService jest bardziej rozbudowanym interfejsem, ale można go obsługiwać w ten sam sposób.


4
+1: Jak mówisz, ExecutorService może być obsługiwany w ten sam sposób, być może przez podklasę AbstractExecutorService.
Paul Cager,

@Paul Tak, AbstractExecutorServicewygląda na to, że należy iść.
przemyślenie

15
W Javie8 możesz to zredukować do samegoRunnable::run
Jon Freedman

@Juude zawsze będzie działać w wątku, który wywołuje moduł wykonawczy.
Gustav Karlsson

Czy nie chodzi o wykonanie tego samego wątku, aby móc zaplanować więcej zadań z poziomu metody execute ()? Ta odpowiedź nie wystarczy. Nie mogę znaleźć odpowiedzi, która by to satysfakcjonowała.
haelix

82

Możesz użyć guawy MoreExecutors.newDirectExecutorService()lub MoreExecutors.directExecutor()jeśli nie potrzebujesz ExecutorService.

Jeśli dołączenie guawy jest zbyt ciężkie, możesz zaimplementować coś prawie tak dobrego:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
  private final CountDownLatch signal = new CountDownLatch(1);

  private SameThreadExecutorService() {
    super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }

  @Override public void shutdown() {
    super.shutdown();
    signal.countDown();
  }

  public static ExecutorService getInstance() {
    return SingletonHolder.instance;
  }

  private static class SingletonHolder {
    static ExecutorService instance = createInstance();    
  }

  private static ExecutorService createInstance() {
    final SameThreadExecutorService instance
        = new SameThreadExecutorService();

    // The executor has one worker thread. Give it a Runnable that waits
    // until the executor service is shut down.
    // All other submitted tasks will use the RejectedExecutionHandler
    // which runs tasks using the  caller's thread.
    instance.submit(new Runnable() {
        @Override public void run() {
          boolean interrupted = false;
          try {
            while (true) {
              try {
                instance.signal.await();
                break;
              } catch (InterruptedException e) {
                interrupted = true;
              }
            }
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
        }});
    return Executors.unconfigurableScheduledExecutorService(instance);
  }
}

1
W przypadku systemu Android zwraca Executors.unconfigurableExecutorService (instancja);
Maragues

jeśli wszystko, czego używamy, to bieżący wątek , po co prymitywy synchronizacji? dlaczego zatrzask?
haelix

@haelix zatrzask jest potrzebny, ponieważ nawet jeśli praca jest wykonywana w tym samym wątku, co ten, który dodał pracę, każdy wątek może zamknąć executor.
NamshubWriter

64

Styl Java 8:

Executor e = Runnable::run;


8
Absolutnie brudne. Kocham to.
Rogue

Co w tym brzydkiego? To eleganckie :)
lpandzic

1
To najlepszy rodzaj brudnego @Ipandzic, jest niezwykły i zwięzły.
Rogue

12

Napisałem na ExecutorServicepodstawie AbstractExecutorService.

/**
 * Executes all submitted tasks directly in the same thread as the caller.
 */
public class SameThreadExecutorService extends AbstractExecutorService {

    //volatile because can be viewed by other threads
    private volatile boolean terminated;

    @Override
    public void shutdown() {
        terminated = true;
    }

    @Override
    public boolean isShutdown() {
        return terminated;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
        shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
        return terminated;
    }

    @Override
    public List<Runnable> shutdownNow() {
        return Collections.emptyList();
    }

    @Override
    public void execute(Runnable theCommand) {
        theCommand.run();
    }
}

zakończone pole nie jest chronione z synchronizacją.
Daneel Yaitskov

1
Pole @ DaneelS.Yaitskov terminatednie będzie korzystać z dostępu zsynchronizowanego w oparciu o kod, który jest tutaj. Operacje na polach 32-bitowych są w Javie niepodzielne.
Christopher Schultz

Przypuszczam, że metoda isTerminated () w powyższym nie jest do końca poprawna, ponieważ isTerminated () ma zwracać wartość true tylko wtedy, gdy nie ma aktualnie wykonywanych zadań. Guawa śledzi liczbę zadań w innej zmiennej, prawdopodobnie dlatego chronią obie zmienne za pomocą blokady.
Jeremy K

7

Możesz użyć RejectedExecutionHandler, aby uruchomić zadanie w bieżącym wątku.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        r.run();
    }
});

Potrzebujesz tylko jednego z nich.


Sprytny! Jak bezpieczne jest to (szczere pytanie)? Czy istnieją sposoby na odrzucenie zadania, w którym faktycznie nie chciałbyś go wykonać w bieżącym wątku? Czy zadania są odrzucane, jeśli ExecutorService jest zamykany lub przerywany?
przemyślenie

Ponieważ maksymalny rozmiar to 0, każde zadanie jest odrzucane. Jednak odrzucone zachowanie ma działać w bieżącym wątku. Problem byłby tylko wtedy, gdyby zadanie NIE zostało odrzucone.
Peter Lawrey,

8
uwaga, istnieje już implementacja tej polityki, nie ma potrzeby definiowania własnej java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy.
jtahlborn

7
Nie jest już możliwe utworzenie ThreadPoolExecutor z maksymalnym rozmiarem puli 0. Sądzę, że byłoby możliwe odtworzenie zachowania przy użyciu blockingQueue o rozmiarze 0, ale żadna domyślna implementacja nie wydaje się na to zezwalać.
Axelle Ziegler

to nie skompiluje się z powodu {code} if (corePoolSize <0 || maximumPoolSize <= 0 || maximumPoolSize <corePoolSize || keepAliveTime <0) {code} w java.util.ThreadPoolExecutor (przynajmniej openJdk 7)
Bogdan

7

Musiałem użyć tego samego "CurrentThreadExecutorService" do celów testowych i chociaż wszystkie sugerowane rozwiązania były fajne (szczególnie ta, w której wspomina się o guawie ), wymyśliłem coś podobnego do tego, co zasugerował tutaj Peter Lawrey .

Jak wspomniano przez Axelle Ziegler tutaj , niestety rozwiązanie Piotra nie będą faktycznie pracować z powodu kontroli wprowadzonego ThreadPoolExecutorna maximumPoolSizeparametrze konstruktora (czyli maximumPoolSizenie może być <=0).

Aby to obejść, wykonałem następujące czynności:

private static ExecutorService currentThreadExecutorService() {
    CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
        @Override
        public void execute(Runnable command) {
            callerRunsPolicy.rejectedExecution(command, this);
        }
    };
}
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.