AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Kiedy to napisałem, założyłem, że wątki zostaną odrodzone tylko wywołanie mapy, ponieważ równolegle jest umieszczane po mapie. Ale niektóre wiersze w pliku otrzymywały różne numery rekordów dla każdego wykonania.
Przeczytałem oficjalną dokumentację dotyczącą strumieni Java i kilka stron internetowych, aby zrozumieć, jak działają strumienie pod maską.
Kilka pytań:
Równoległy strumień Java działa w oparciu o SplitIterator , który jest implementowany przez każdą kolekcję, taką jak ArrayList, LinkedList itp. Kiedy konstruujemy równoległy strumień z tych kolekcji, odpowiedni podzielony iterator zostanie użyty do podzielenia i iteracji kolekcji. To wyjaśnia, dlaczego równoległość występowała na poziomie oryginalnego źródła wejściowego (wierszy pliku), a nie na podstawie wyniku mapy (tj. Zapis pojo). Czy moje rozumowanie jest prawidłowe?
W moim przypadku wejściem jest plikowy strumień IO. Który iterator podzielony zostanie użyty?
Nie ma znaczenia, gdzie umieszczamy
parallel()
w rurociągu. Oryginalne źródło wejściowe będzie zawsze podzielone i zostaną zastosowane pozostałe operacje pośrednie.W takim przypadku Java nie powinna umożliwiać użytkownikom wykonywania operacji równoległych w dowolnym miejscu potoku, z wyjątkiem oryginalnego źródła. Ponieważ daje złe zrozumienie dla tych, którzy nie wiedzą, jak działa strumień Java. Wiem, że
parallel()
operacja byłaby zdefiniowana dla typu obiektu Stream, więc działa w ten sposób. Ale lepiej jest podać alternatywne rozwiązanie.W powyższym fragmencie kodu próbuję dodać numer wiersza do każdego rekordu w pliku wejściowym, więc należy go zamówić. Chcę jednak zastosować
doSomeOperation()
równolegle, ponieważ jest to logika ciężka. Jednym ze sposobów na osiągnięcie tego jest napisanie własnego dostosowanego podzielonego iteratora. Czy jest jakiś inny sposób?
Stream
bezpośrednio w interfejsie, a ze względu na fajne kaskadowanie każda operacja jest zwracana Stream
ponownie. Wyobraź sobie, że ktoś chce ci dać, Stream
ale już zastosował kilka podobnych operacji map
. Ty, jako użytkownik, nadal chcesz mieć możliwość decydowania, czy chcesz uruchomić go równolegle, czy nie. Dlatego musi być możliwe, aby parallel()
nadal dzwonić , mimo że strumień już istnieje.
flatMap
lub wykonasz niebezpieczne metody wątków lub podobne.
Path
znajduje się w lokalnym systemie plików i używasz najnowszego JDK, spliterator będzie miał lepsze możliwości przetwarzania równoległego niż grupowanie wielokrotności 1024. Ale zrównoważone dzielenie może nawet przynieść efekt przeciwny do zamierzonego w niektórych findFirst
scenariuszach…
parallel()
jest niczym innym jak ogólnym żądaniem modyfikatora, które jest stosowane do bazowego obiektu strumienia. Pamiętaj, że istnieje tylko jeden strumień źródłowy, jeśli nie zastosujesz końcowych operacji do potoku, tzn. Dopóki nic nie zostanie „wykonane”. To powiedziawszy, w zasadzie kwestionujesz opcje projektowania Java. Który opiera się na opiniach i naprawdę nie możemy w tym pomóc.