Czy strumienie Java 8 są podobne do obserwacji RxJava?
Definicja strumienia Java 8:
Klasy w nowym
java.util.stream
pakiecie udostępniają interfejs API Stream do obsługi operacji w stylu funkcjonalnym na strumieniach elementów.
Czy strumienie Java 8 są podobne do obserwacji RxJava?
Definicja strumienia Java 8:
Klasy w nowym
java.util.stream
pakiecie udostępniają interfejs API Stream do obsługi operacji w stylu funkcjonalnym na strumieniach elementów.
Odpowiedzi:
TL; DR : Wszystkie biblioteki do przetwarzania sekwencji / strumieni oferują bardzo podobne API do budowania potoków. Różnice dotyczą interfejsu API do obsługi wielowątkowości i kompozycji potoków.
RxJava różni się znacznie od Stream. Ze wszystkich rzeczy JDK, najbliższy rx.Observable jest być może kombinacja java.util.stream.Collector Stream + CompletableFuture (co wiąże się z kosztem radzenia sobie z dodatkową warstwą monady, tj. Koniecznością obsługi konwersji między Stream<CompletableFuture<T>>
i CompletableFuture<Stream<T>>
).
Istnieją znaczące różnice między Observable i Stream:
Stream#parallel()
dzieli sekwencję na partycje Observable#subscribeOn()
i Observable#observeOn()
nie; Trudno jest emulować Stream#parallel()
zachowanie z Observable, kiedyś miał .parallel()
metodę, ale ta metoda spowodowała tyle zamieszania, że .parallel()
wsparcie zostało przeniesione do oddzielnego repozytorium na github, RxJavaParallel. Więcej szczegółów w innej odpowiedzi .Stream#parallel()
nie pozwala na określenie puli wątków do użycia, w przeciwieństwie do większości metod RxJava akceptujących opcjonalny harmonogram. Ponieważ wszystkie instancje strumienia w JVM używają tej samej puli łączeń rozwidlonych, dodanie .parallel()
może przypadkowo wpłynąć na zachowanie w innym module programuObservable#interval()
, Observable#window()
i wiele innych; Dzieje się tak głównie dlatego, że strumienie są oparte na ściąganiu i nie ma kontroli nad tym, kiedy emitować następny element w dółtakeWhile()
, takeUntil()
); obejście użycie Stream#anyMatch()
jest ograniczone: jest to operacja terminalowa, więc nie możesz jej użyć więcej niż raz na strumieńObservable#using()
); możesz owinąć nim strumień IO lub mutex i mieć pewność, że użytkownik nie zapomni zwolnić zasobu - zostanie on automatycznie usunięty po zakończeniu subskrypcji; Stream ma onClose(Runnable)
metodę, ale musisz wywołać ją ręcznie lub za pomocą try-with-resources. Np. musisz pamiętać, że Files # lines () muszą być zawarte w bloku try-with-resources.Podsumowanie: RxJava znacznie różni się od strumieni. Prawdziwe alternatywy RxJava to inne implementacje ReactiveStreams , np. Odpowiednia część Akka.
Aktualizuj . Istnieje sztuczka polegająca na użyciu innej niż domyślna puli złącz rozwidlonych Stream#parallel
, zobacz niestandardową pulę wątków w strumieniu równoległym Java 8
Aktualizuj . Wszystkie powyższe oparte są na doświadczeniach z RxJava 1.x. Teraz, gdy RxJava 2.x jest tutaj , ta odpowiedź może być nieaktualna.
Stream.generate()
i przekazać własną Supplier<U>
implementację, po prostu jedną prostą metodę, z której udostępniasz następny element w strumieniu. Jest mnóstwo innych metod. Aby łatwo skonstruować sekwencję, Stream
która zależy od poprzednich wartości, możesz użyć interate()
metody, każda Collection
ma stream()
metodę i Stream.of()
konstruuje a Stream
z varargs lub tablicy. Wreszcie StreamSupport
obsługuje bardziej zaawansowane tworzenie strumieni za pomocą rozdzielaczy lub strumieni typów pierwotnych.
takeWhile()
, takeUntil()
);" - JDK9 ma to, jak sądzę, w takeWhile () i dropWhile ()
Java 8 Stream i RxJava wyglądają całkiem podobnie. Mają podobne operatory (filter, map, flatMap ...), ale nie są zbudowane do tego samego zastosowania.
Możesz wykonywać zadania asynchroniczne za pomocą RxJava.
Dzięki strumieniowi Java 8 będziesz przechodzić przez elementy swojej kolekcji.
Możesz zrobić prawie to samo w RxJava (przechodzenie elementów kolekcji), ale ponieważ RxJava koncentruje się na zadaniu równoległym, ... używa synchronizacji, zatrzasku, ... Więc to samo zadanie z użyciem RxJava może być wolniejsze niż ze strumieniem Java 8.
RxJava można porównać do CompletableFuture
, ale to może być w stanie obliczyć więcej niż jedną wartość.
parallelStream
obsługuje podobną synchronizację prostych przejść / map / filtrowania itp.
Istnieje kilka różnic technicznych i koncepcyjnych, na przykład strumienie Java 8 są synchronicznymi sekwencjami wartości jednorazowego użytku, opartymi na ściąganiu, natomiast obserwowalne RxJava to możliwe do ponownej obserwacji, oparte na adaptacyjnie push-pull, potencjalnie asynchroniczne sekwencje wartości. RxJava jest przeznaczona dla języka Java 6+ i działa również na Androida.
Strumienie Java 8 są oparte na ściąganiu. Wykonujesz iterację po strumieniu Java 8 zużywającym każdy element. I mógłby to być niekończący się strumień.
RXJava Observable
jest domyślnie oparta na wypychaniu. Subskrybujesz Observable, a otrzymasz powiadomienie, gdy nadejdzie następny element ( onNext
) lub kiedy strumień zostanie zakończony ( onCompleted
) lub gdy wystąpi błąd ( onError
). Ponieważ z Observable
pojawić onNext
, onCompleted
, onError
imprezy, można zrobić kilka potężnych funkcji, takich jak łączenie różnych Observable
s na nowy ( zip
, merge
, concat
). Inne rzeczy, które możesz zrobić, to buforowanie, dławienie, ... I używa mniej więcej tego samego API w różnych językach (RxJava, RX w C #, RxJS, ...)
Domyślnie RxJava jest jednowątkowa. O ile nie zaczniesz używać harmonogramów, wszystko będzie się odbywać w tym samym wątku.
Istniejące odpowiedzi są wyczerpujące i poprawne, ale brakuje jasnego przykładu dla początkujących. Pozwólcie, że przytoczę konkretne terminy, takie jak „oparte na pchaniu / ciągnięciu” i „ponowna obserwacja”. Uwaga : nienawidzę tego terminuObservable
(to strumień na litość boską), więc będę po prostu odnosił się do strumieni J8 vs RX.
Rozważ listę liczb całkowitych,
digits = [1,2,3,4,5]
Strumień J8 to narzędzie do modyfikowania kolekcji. Na przykład nawet cyfry można wyodrębnić jako,
evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())
To jest w zasadzie mapa Pythona , filtrowanie, zmniejszanie , bardzo ładny (i dawno spóźniony) dodatek do Javy. Ale co by było, gdyby cyfry nie były zbierane z wyprzedzeniem - co by było, gdyby cyfry były przesyłane strumieniowo podczas działania aplikacji - czy moglibyśmy filtrować parzyste w czasie rzeczywistym.
Wyobraź sobie, że oddzielny proces wątku wyprowadza liczby całkowite w losowych momentach, gdy aplikacja jest uruchomiona ( ---
oznacza czas)
digits = 12345---6------7--8--9-10--------11--12
W RX even
może reagować na każdą nową cyfrę i stosować filtr w czasie rzeczywistym
even = -2-4-----6---------8----10------------12
Nie ma potrzeby przechowywania list wejściowych i wyjściowych. Jeśli chcesz mieć listę wyników, nie ma problemu, że można ją również przesyłać strumieniowo. W rzeczywistości wszystko jest strumieniem.
evens_stored = even.collect()
Dlatego terminy takie jak „bezstanowy” i „funkcjonalny” są bardziej kojarzone z RX
RxJava jest również ściśle związana z inicjatywą strumieni reaktywnych i traktuje ją jako prostą implementację interfejsu API strumieni reaktywnych (np. W porównaniu z implementacją strumieni Akka ). Główna różnica polega na tym, że strumienie reaktywne są zaprojektowane tak, aby były w stanie poradzić sobie z ciśnieniem wstecznym, ale jeśli spojrzysz na stronę strumieni reaktywnych, zobaczysz pomysł. Dość dobrze opisują swoje cele, a strumienie są również ściśle związane z reaktywnym manifestem .
Strumienie Java 8 są w zasadzie implementacją nieograniczonej kolekcji, podobną do Scala Stream lub leniwej sekwencji Clojure .
Strumienie Java 8 umożliwiają wydajne przetwarzanie naprawdę dużych kolekcji przy jednoczesnym wykorzystaniu architektur wielordzeniowych. Natomiast RxJava jest domyślnie jednowątkowa (bez harmonogramów). Więc RxJava nie wykorzysta maszyn wielordzeniowych, chyba że sam zaprogramujesz tę logikę.