Niestandardowa pula wątków w równoległym strumieniu Java 8


398

Czy można określić niestandardową pulę wątków dla równoległego strumienia Java 8 ? Nie mogę tego nigdzie znaleźć.

Wyobraź sobie, że mam aplikację serwera i chciałbym korzystać z równoległych strumieni. Ale aplikacja jest duża i wielowątkowa, więc chcę ją podzielić na części. Nie chcę wolno działającego zadania w jednym module zadań bloku aplikacji z innego modułu.

Jeśli nie mogę użyć różnych pul wątków dla różnych modułów, oznacza to, że nie mogę bezpiecznie używać równoległych strumieni w większości rzeczywistych sytuacji.

Wypróbuj następujący przykład. Niektóre zadania intensywnie wykorzystujące procesor są wykonywane w osobnych wątkach. Zadania wykorzystują równoległe strumienie. Pierwsze zadanie jest zepsute, więc każdy krok zajmuje 1 sekundę (symulowany przez uśpienie wątku). Problem polega na tym, że inne wątki blokują się i czekają na zakończenie przerwanego zadania. To wymyślony przykład, ale wyobraź sobie aplikację serwletu i kogoś, kto przesyła długo działające zadanie do wspólnej puli dołączania wideł.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

3
Co rozumiesz przez niestandardową pulę wątków? Istnieje jeden wspólny ForkJoinPool, ale zawsze możesz utworzyć własny ForkJoinPool i przesyłać do niego żądania.
edharned

7
Podpowiedź: mistrz Java Heinz Kabutz sprawdza ten sam problem, ale z jeszcze gorszym skutkiem: zakleszczone wątki wspólnej puli złączonych wideł. Zobacz javaspecialists.eu/archive/Issue223.html
Peti

Odpowiedzi:


395

W rzeczywistości istnieje pewna sztuczka, jak wykonać równoległą operację w konkretnej puli sprzężenia wideł. Jeśli wykonasz go jako zadanie w puli łączenia widelca, pozostanie tam i nie użyje wspólnej.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

Trik jest oparty na ForkJoinTask.fork, który określa: „Organizuje asynchroniczne wykonanie tego zadania w puli, w której działa bieżące zadanie, jeśli ma zastosowanie, lub za pomocą ForkJoinPool.commonPool (), jeśli nie inForkJoinPool ()”


20
Szczegóły dotyczące rozwiązania są opisane tutaj blog.krecan.net/2014/03/18/…
Lukas

3
Ale czy określono również, że strumienie używają ForkJoinPoollub czy jest to szczegół implementacji? Link do dokumentacji byłby miły.
Nicolai,

6
@Lukas Dzięki za fragment. Dodam, że ForkJoinPoolinstancja powinna być shutdown()wtedy, gdy nie jest już potrzebna, aby uniknąć wycieku nici. (przykład)
jck

5
Zauważ, że w Javie 8 występuje błąd, który chociaż zadania działają na niestandardowej instancji puli, nadal są one połączone z pulą współużytkowaną: wielkość obliczeń pozostaje proporcjonalna do puli wspólnej, a nie puli niestandardowej. Naprawiono w Javie 10: JDK-8190974
Terran

3
@terran Ten problem został również naprawiony dla błędów
Cutberto Ocampo

192

Strumienie równoległe używają wartości domyślnej, ForkJoinPool.commonPoolktóra domyślnie ma o jedenRuntime.getRuntime().availableProcessors() wątek mniejszą liczbę procesorów , co jest zwracane przez (Oznacza to, że równoległe strumienie używają wszystkich procesorów, ponieważ również używają głównego wątku):

W przypadku aplikacji wymagających oddzielnych lub niestandardowych pul, ForkJoinPool może być konstruowany z określonym docelowym poziomem równoległości; domyślnie równa liczbie dostępnych procesorów.

Oznacza to również, że jeśli zagnieżdżono równoległe strumienie lub wiele równoległych strumieni uruchomionych jednocześnie, wszystkie one będą miały tę samą pulę. Zaleta: nigdy nie użyjesz więcej niż domyślna (liczba dostępnych procesorów). Wada: możesz nie przypisać „wszystkich procesorów” do każdego inicjowanego równoległego strumienia (jeśli akurat masz więcej niż jeden). (Najwyraźniej możesz użyć ManagedBlocker, aby to obejść).

Aby zmienić sposób wykonywania równoległych strumieni, możesz albo

  • przesłać wykonanie równoległego strumienia do własnego ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();lub
  • możesz zmienić rozmiar wspólnej puli, używając właściwości systemowych: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")dla docelowej równoległości 20 wątków. Jednak to już nie działa po zaktualizowanej łatce https://bugs.openjdk.java.net/browse/JDK-8190974 .

Przykład tego ostatniego na moim komputerze, który ma 8 procesorów. Jeśli uruchomię następujący program:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

Dane wyjściowe to:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Widać więc, że strumień równoległy przetwarza 8 elementów jednocześnie, tzn. Używa 8 wątków. Jeśli jednak odkomentuję skomentowany wiersz, wynikiem jest:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

Tym razem strumień równoległy wykorzystał 20 wątków, a wszystkie 20 elementów w strumieniu zostało przetworzonych jednocześnie.


30
W commonPoolrzeczywistości ma jeden mniej niż availableProcessors, co powoduje całkowitą równoległość równą, availableProcessorsponieważ wątek wywołujący liczy się jako jeden.
Marko Topolnik

2
przesłać zwrot ForkJoinTask. Do naśladowania parallel() get()jest potrzebne:stream.parallel().forEach(soSomething)).get();
Grigory Kislin

5
Nie jestem przekonany, że ForkJoinPool.submit(() -> stream.forEach(...))moje działania Stream będą uruchamiane z podanym ForkJoinPool. Oczekiwałbym, że cała akcja Stream jest wykonywana w ForJoinPool jako JEDNA akcja, ale wewnętrznie nadal korzysta z domyślnego / wspólnego ForkJoinPool. Gdzie widziałeś, że ForkJoinPool.submit () zrobiłby to, co mówisz, że działa?
Frederic Leitenberger

@FredericLeitenberger Prawdopodobnie chciałeś umieścić swój komentarz poniżej odpowiedzi Lukasa.
assylias

2
Teraz widzę, że stackoverflow.com/a/34930831/1520422 pokazuje ładnie, że faktycznie działa zgodnie z zapowiedziami. Jednak nadal nie rozumiem JAK to działa. Ale nie mam nic przeciwko „to działa”. Dzięki!
Frederic Leitenberger

39

Alternatywnie do sztuczki polegającej na uruchamianiu obliczeń równoległych w swoim własnym forkJoinPool możesz również przekazać tę pulę do metody CompletableFuture.supplyAsync, jak w:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

22

Oryginalne rozwiązanie (ustawienie wspólnej właściwości równoległości ForkJoinPool) już nie działa. Patrząc na łącza w oryginalnej odpowiedzi, aktualizacja, która je łamie, została ponownie przeniesiona do Javy 8. Jak wspomniano w połączonych wątkach, nie gwarantuje się, że to rozwiązanie będzie działać wiecznie. Na tej podstawie rozwiązaniem jest forkjoinpool.submit z rozwiązaniem .get omówionym w zaakceptowanej odpowiedzi. Myślę, że backport naprawia również niewiarygodność tego rozwiązania.

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();

Nie widzę zmiany w równoległości, gdy robię to ForkJoinPool.commonPool().getParallelism()w trybie debugowania.
d-coder

Dzięki. Zrobiłem kilka testów / badań i zaktualizowałem odpowiedź. Wygląda na to, że aktualizacja go zmieniła, ponieważ działa w starszych wersjach.
Tod Casasent

Dlaczego ciągle to otrzymuję: unreported exception InterruptedException; must be caught or declared to be thrownnawet ze wszystkimi catchwyjątkami w pętli.
Rocky Li

Rocky, nie widzę żadnych błędów. Znajomość wersji Java i dokładnej linii pomoże. „InterruptedException” sugeruje, że próba złapania się podczas snu nie jest poprawnie zamknięta w twojej wersji.
Tod Casasent

13

Możemy zmienić domyślną równoległość za pomocą następującej właściwości:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

które można skonfigurować tak, aby korzystało z większej równoległości.


Chociaż jest to ustawienie globalne, działa w celu zwiększenia równoległego
strumienia

Działa to dla mnie w wersji openjdk „1.8.0_222”
abbas

Ta sama osoba jak wyżej, to nie działa dla mnie na openjdk „11.0.6”
abbas

8

Aby zmierzyć rzeczywistą liczbę używanych wątków, możesz sprawdzić Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

Może to wytworzyć na 4-rdzeniowym procesorze wyjście takie jak:

5 // common pool
23 // custom pool

Bez .parallel()tego daje:

3 // common pool
4 // custom pool

6
Thread.activeCount () nie mówi ci, jakie wątki przetwarzają Twój strumień. Zamiast tego zamapuj na Thread.currentThread (). GetName (), a następnie wyraźne (). Wtedy zdasz sobie sprawę, że nie każdy wątek w puli zostanie użyty ... Dodaj opóźnienie do przetwarzania, a wszystkie wątki w puli zostaną wykorzystane.
keyoxy

7

Do tej pory korzystałem z rozwiązań opisanych w odpowiedziach na to pytanie. Teraz stworzyłem małą bibliotekę o nazwie Parallel Stream Support :

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

Ale jak zauważył @PabloMatiasGomez w komentarzach, istnieją mechanizmy podziału mechanizmu równoległych strumieni, które w dużym stopniu zależą od wielkości wspólnej puli. Zobacz Równoległy strumień z zestawu HashSet nie działa równolegle .

Korzystam z tego rozwiązania tylko, aby mieć osobne pule dla różnych rodzajów pracy, ale nie mogę ustawić wielkości wspólnej puli na 1, nawet jeśli go nie używam.



1

Próbowałem niestandardowego ForkJoinPool w następujący sposób, aby dostosować rozmiar puli:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Oto wynik mówiący, że pula używa więcej wątków niż domyślna 4 .

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

Ale tak naprawdę jest dziwne , kiedy próbowałem osiągnąć ten sam wynik, używając ThreadPoolExecutornastępujących metod :

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

ale zawiodłem.

Uruchamia równolegleStream w nowym wątku, a wtedy wszystko inne jest takie samo, co ponownie dowodzi, że parallelStreamużyje ForkJoinPool do uruchomienia wątków potomnych.


Jaki może być powód niedopuszczenia innych wykonawców?
omjego

@omjego To dobre pytanie, być może możesz zacząć nowe pytanie i podać więcej szczegółów w celu opracowania swoich pomysłów;)
Hearen

1

Idź po AbacusUtil . Numer wątku można określić dla strumienia równoległego. Oto przykładowy kod:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Ujawnienie : Jestem programistą AbacusUtil.


1

Jeśli nie chcesz polegać na hakach implementacyjnych, zawsze możesz to osiągnąć, wdrażając niestandardowe kolektory, które będą łączyć mapi collectsemantykę ... i nie będziesz ograniczony do ForkJoinPool:

list.stream()
  .collect(parallelToList(i -> fetchFromDb(i), executor))
  .join()

Na szczęście jest to już zrobione tutaj i dostępne w Maven Central: http://github.com/pivovarit/parallel-collectors

Uwaga: Napisałem to i biorę za to odpowiedzialność.


0

Jeśli nie masz nic przeciwko korzystaniu z biblioteki innej firmy, dzięki cyklopowi- reaguj możesz mieszać sekwencyjne i równoległe strumienie w tym samym potoku i zapewnić niestandardowe ForkJoinPools. Na przykład

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Lub jeśli chcielibyśmy kontynuować przetwarzanie w ramach sekwencyjnego strumienia

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Ujawnienie Jestem wiodącym twórcą Cyclops-reag]


0

Jeśli nie potrzebujesz niestandardowej puli wątków, ale chcesz ograniczyć liczbę jednoczesnych zadań, możesz użyć:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
       // do your processing   
}));

(Duplikat pytania o to jest zablokowany, więc proszę, proszę mnie tutaj)


-2

możesz spróbować wdrożyć ten ForkJoinWorkerThreadFactory i wstrzyknąć go do klasy Fork-Join.

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

możesz to zrobić za pomocą tego konstruktora puli Fork-Join.

uwagi: - 1. jeśli użyjesz tego, weź pod uwagę, że w oparciu o implementację nowych wątków, wpłynie to na planowanie z JVM, co ogólnie planuje połączenia wątków z rozwidleniem do różnych rdzeni (traktowanych jako wątek obliczeniowy). 2. Nie ma to wpływu na planowanie zadań przez łączenie wideł do wątków. 3. Naprawdę nie zorientowałem się, w jaki sposób równoległy strumień wybiera wątki ze złączenia wideł (nie mogłem znaleźć na nim odpowiedniej dokumentacji), więc spróbuj użyć innej fabryki wątków, aby się upewnić, czy wątki w równoległym strumieniu są wybierane z dostarczonego przez customThreadFactory. 4. commonThreadPool nie będzie korzystał z tego customThreadFactory.


Czy możesz podać użyteczny przykład, który zademonstruje sposób korzystania z tego, co określiłeś?
J. Murray,
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.