Różnica między strumieniami Java 8 a obserwacjami RxJava


144

Czy strumienie Java 8 są podobne do obserwacji RxJava?

Definicja strumienia Java 8:

Klasy w nowym java.util.streampakiecie udostępniają interfejs API Stream do obsługi operacji w stylu funkcjonalnym na strumieniach elementów.


8
Do Twojej wiadomości są propozycje wprowadzenia większej liczby klas typu RxJava w JDK 9. jsr166-concurrency.10961.n7.nabble.com/…
John Vint

@JohnVint Jaki jest status tej propozycji. Czy rzeczywiście odleci?
IgorGanapolsky

2
@IgorGanapolsky O tak, zdecydowanie wygląda na to, że trafi do jdk9. cr.openjdk.java.net/~martin/webrevs/openjdk9/… . Jest nawet port dla RxJava do Flow github.com/akarnokd/RxJavaUtilConcurrentFlow .
John Vint

Wiem, że to naprawdę stare pytanie, ale niedawno uczestniczyłem w tym wspaniałym wykładzie Venkata Subramaniama, który zawiera wnikliwe podejście do tematu i został zaktualizowany do Java9: youtube.com/watch?v=kfSSKM9y_0E . Może być interesujący dla osób zagłębiających się w RxJava.
Pedro,

Odpowiedzi:


152

TL; DR : Wszystkie biblioteki do przetwarzania sekwencji / strumieni oferują bardzo podobne API do budowania potoków. Różnice dotyczą interfejsu API do obsługi wielowątkowości i kompozycji potoków.

RxJava różni się znacznie od Stream. Ze wszystkich rzeczy JDK, najbliższy rx.Observable jest być może kombinacja java.util.stream.Collector Stream + CompletableFuture (co wiąże się z kosztem radzenia sobie z dodatkową warstwą monady, tj. Koniecznością obsługi konwersji między Stream<CompletableFuture<T>>i CompletableFuture<Stream<T>>).

Istnieją znaczące różnice między Observable i Stream:

  • Strumienie są oparte na ściąganiu, a Observables - na zasadzie wypychania. Może się to wydawać zbyt abstrakcyjne, ale ma poważne konsekwencje, które są bardzo konkretne.
  • Stream można wykorzystać tylko raz, Observable można subskrybować wiele razy
  • Stream#parallel()dzieli sekwencję na partycje Observable#subscribeOn()i Observable#observeOn()nie; Trudno jest emulować Stream#parallel()zachowanie z Observable, kiedyś miał .parallel()metodę, ale ta metoda spowodowała tyle zamieszania, że .parallel()wsparcie zostało przeniesione do oddzielnego repozytorium na github, RxJavaParallel. Więcej szczegółów w innej odpowiedzi .
  • Stream#parallel()nie pozwala na określenie puli wątków do użycia, w przeciwieństwie do większości metod RxJava akceptujących opcjonalny harmonogram. Ponieważ wszystkie instancje strumienia w JVM używają tej samej puli łączeń rozwidlonych, dodanie .parallel()może przypadkowo wpłynąć na zachowanie w innym module programu
  • Strumienie brakuje operacje związane z czasem, jak Observable#interval(), Observable#window()i wiele innych; Dzieje się tak głównie dlatego, że strumienie są oparte na ściąganiu i nie ma kontroli nad tym, kiedy emitować następny element w dół
  • Strumienie oferują ograniczony zestaw operacji w porównaniu z RxJava. Np. Strumienie nie mają operacji odcięcia ( takeWhile(), takeUntil()); obejście użycie Stream#anyMatch()jest ograniczone: jest to operacja terminalowa, więc nie możesz jej użyć więcej niż raz na strumień
  • Począwszy od JDK 8, nie ma operacji Stream # zip, co jest czasami bardzo przydatne
  • Strumienie są trudne do skonstruowania samodzielnie, Observable można skonstruować na wiele sposobów EDYCJA: Jak zauważono w komentarzach, istnieją sposoby na skonstruowanie strumienia. Jednakże, ponieważ nie ma zwarcia na terminalu, nie możesz np.łatwo wygenerować Strumień linii w pliku (JDK zapewnia linie Files # i BufferedReader # linie po wyjęciu z pudełka, a innymi podobnymi scenariuszami można zarządzać, konstruując Stream z Iteratora).
  • Observable oferuje funkcję zarządzania zasobami ( Observable#using()); możesz owinąć nim strumień IO lub mutex i mieć pewność, że użytkownik nie zapomni zwolnić zasobu - zostanie on automatycznie usunięty po zakończeniu subskrypcji; Stream ma onClose(Runnable)metodę, ale musisz wywołać ją ręcznie lub za pomocą try-with-resources. Np. musisz pamiętać, że Files # lines () muszą być zawarte w bloku try-with-resources.
  • Observables są synchronizowane przez cały czas (nie sprawdzałem, czy to samo dotyczy strumieni). Dzięki temu nie musisz zastanawiać się, czy podstawowe operacje są bezpieczne dla wątków (odpowiedź zawsze brzmi „tak”, chyba że wystąpi błąd), ale narzut związany ze współbieżnością będzie tam, bez względu na to, czy Twój kod tego potrzebuje, czy nie.

Podsumowanie: RxJava znacznie różni się od strumieni. Prawdziwe alternatywy RxJava to inne implementacje ReactiveStreams , np. Odpowiednia część Akka.

Aktualizuj . Istnieje sztuczka polegająca na użyciu innej niż domyślna puli złącz rozwidlonych Stream#parallel, zobacz niestandardową pulę wątków w strumieniu równoległym Java 8

Aktualizuj . Wszystkie powyższe oparte są na doświadczeniach z RxJava 1.x. Teraz, gdy RxJava 2.x jest tutaj , ta odpowiedź może być nieaktualna.


2
Dlaczego tworzenie strumieni jest trudne? Zgodnie z tym artykułem wydaje się to łatwe: oracle.com/technetwork/articles/java/…
IgorGanapolsky

2
Istnieje wiele klas, które mają metodę „stream”: kolekcje, strumienie wejściowe, pliki katalogów itp. Ale co, jeśli chcesz utworzyć strumień z niestandardowej pętli - powiedzmy, iterując po kursorze bazy danych? Najlepszym sposobem, jaki znalazłem do tej pory, jest utworzenie Iteratora, owinięcie go Spliteratorem i wreszcie wywołanie StreamSupport # fromSpliterator. Za dużo kleju dla prostego etui IMHO. Istnieje również Stream.iterate, ale generuje nieskończony strumień. Jedynym sposobem na odcięcie krzyku w tym przypadku jest Stream # anyMatch, ale jest to operacja terminalowa, więc nie można oddzielić producenta strumienia od konsumenta
Kirill Gamazkov

2
RxJava ma Observable.fromCallable, Observable.create i tak dalej. Lub możesz bezpiecznie wyprodukować nieskończoną liczbę Observable, a następnie powiedzieć `` .takeWhile (condition) '' i możesz
wysłać

1
Strumienie nie są trudne do samodzielnego skonstruowania. Możesz po prostu wywołać Stream.generate()i przekazać własną Supplier<U>implementację, po prostu jedną prostą metodę, z której udostępniasz następny element w strumieniu. Jest mnóstwo innych metod. Aby łatwo skonstruować sekwencję, Streamktóra zależy od poprzednich wartości, możesz użyć interate()metody, każda Collectionma stream()metodę i Stream.of()konstruuje a Streamz varargs lub tablicy. Wreszcie StreamSupportobsługuje bardziej zaawansowane tworzenie strumieni za pomocą rozdzielaczy lub strumieni typów pierwotnych.
jbx

"Strumienie nie mają operacji odcięcia ( takeWhile(), takeUntil());" - JDK9 ma to, jak sądzę, w takeWhile () i dropWhile ()
Abdul

50

Java 8 Stream i RxJava wyglądają całkiem podobnie. Mają podobne operatory (filter, map, flatMap ...), ale nie są zbudowane do tego samego zastosowania.

Możesz wykonywać zadania asynchroniczne za pomocą RxJava.

Dzięki strumieniowi Java 8 będziesz przechodzić przez elementy swojej kolekcji.

Możesz zrobić prawie to samo w RxJava (przechodzenie elementów kolekcji), ale ponieważ RxJava koncentruje się na zadaniu równoległym, ... używa synchronizacji, zatrzasku, ... Więc to samo zadanie z użyciem RxJava może być wolniejsze niż ze strumieniem Java 8.

RxJava można porównać do CompletableFuture, ale to może być w stanie obliczyć więcej niż jedną wartość.


12
Warto zauważyć, że stwierdzenie dotyczące przechodzenia przez strumień jest prawdziwe tylko dla strumienia nierównoległego. parallelStreamobsługuje podobną synchronizację prostych przejść / map / filtrowania itp.
John Vint

2
Nie sądzę, „Więc to samo zadanie przy użyciu RxJava może być wolniejsze niż w przypadku strumienia Java 8”. sprawdza się powszechnie, w dużym stopniu zależy od wykonywanego zadania.
daschl

1
Cieszę się, że powiedziałeś, że to samo zadanie przy użyciu RxJava może być wolniejsze niż w przypadku strumienia Java 8 . Jest to bardzo ważna różnica, której wielu użytkowników RxJava nie jest świadomych.
IgorGanapolsky

RxJava jest domyślnie synchroniczna. Czy masz jakieś punkty odniesienia, które potwierdzają twoje stwierdzenie, że może to być wolniejsze?
Marcin Koziński

6
@ marcin-koziński możesz sprawdzić ten benchmark: twitter.com/akarnokd/status/752465265091309568
dwursteisen

37

Istnieje kilka różnic technicznych i koncepcyjnych, na przykład strumienie Java 8 są synchronicznymi sekwencjami wartości jednorazowego użytku, opartymi na ściąganiu, natomiast obserwowalne RxJava to możliwe do ponownej obserwacji, oparte na adaptacyjnie push-pull, potencjalnie asynchroniczne sekwencje wartości. RxJava jest przeznaczona dla języka Java 6+ i działa również na Androida.


4
Typowy kod wykorzystujący RxJava w dużym stopniu korzysta z lambd, które są dostępne tylko od wersji Java 8. Możesz więc używać Rx z Javą 6, ale kod będzie głośny
Kirill Gamazkov

1
Podobna różnica polega na tym, że obserwowalne Rx mogą pozostać przy życiu przez czas nieokreślony, dopóki nie zostaną wyrejestrowane. Strumienie Java 8 są domyślnie przerywane operacjami.
IgorGanapolsky

2
@KirillGamazkov, możesz użyć retrolambda, aby Twój kod był ładniejszy w przypadku Javy 6.
Marcin Koziński

Kotlin wygląda jeszcze bardziej seksownie niż modernizacja
Kirill Gamazkov

30

Strumienie Java 8 są oparte na ściąganiu. Wykonujesz iterację po strumieniu Java 8 zużywającym każdy element. I mógłby to być niekończący się strumień.

RXJava Observablejest domyślnie oparta na wypychaniu. Subskrybujesz Observable, a otrzymasz powiadomienie, gdy nadejdzie następny element ( onNext) lub kiedy strumień zostanie zakończony ( onCompleted) lub gdy wystąpi błąd ( onError). Ponieważ z Observablepojawić onNext, onCompleted, onErrorimprezy, można zrobić kilka potężnych funkcji, takich jak łączenie różnych Observables na nowy ( zip, merge, concat). Inne rzeczy, które możesz zrobić, to buforowanie, dławienie, ... I używa mniej więcej tego samego API w różnych językach (RxJava, RX w C #, RxJS, ...)

Domyślnie RxJava jest jednowątkowa. O ile nie zaczniesz używać harmonogramów, wszystko będzie się odbywać w tym samym wątku.


w Stream masz forEach, czyli prawie to samo, co onNext
paul

W rzeczywistości strumienie są zwykle terminalami. „Operacje, które zamykają potok strumienia, nazywane są operacjami terminalowymi. Dają wynik z potoku takiego jak List, Integer lub nawet void (dowolny typ inny niż Stream)”. ~ oracle.com/technetwork/articles/java/…
IgorGanapolsky

26

Istniejące odpowiedzi są wyczerpujące i poprawne, ale brakuje jasnego przykładu dla początkujących. Pozwólcie, że przytoczę konkretne terminy, takie jak „oparte na pchaniu / ciągnięciu” i „ponowna obserwacja”. Uwaga : nienawidzę tego terminuObservable (to strumień na litość boską), więc będę po prostu odnosił się do strumieni J8 vs RX.

Rozważ listę liczb całkowitych,

digits = [1,2,3,4,5]

Strumień J8 to narzędzie do modyfikowania kolekcji. Na przykład nawet cyfry można wyodrębnić jako,

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

To jest w zasadzie mapa Pythona , filtrowanie, zmniejszanie , bardzo ładny (i dawno spóźniony) dodatek do Javy. Ale co by było, gdyby cyfry nie były zbierane z wyprzedzeniem - co by było, gdyby cyfry były przesyłane strumieniowo podczas działania aplikacji - czy moglibyśmy filtrować parzyste w czasie rzeczywistym.

Wyobraź sobie, że oddzielny proces wątku wyprowadza liczby całkowite w losowych momentach, gdy aplikacja jest uruchomiona ( ---oznacza czas)

digits = 12345---6------7--8--9-10--------11--12

W RX evenmoże reagować na każdą nową cyfrę i stosować filtr w czasie rzeczywistym

even = -2-4-----6---------8----10------------12

Nie ma potrzeby przechowywania list wejściowych i wyjściowych. Jeśli chcesz mieć listę wyników, nie ma problemu, że można ją również przesyłać strumieniowo. W rzeczywistości wszystko jest strumieniem.

evens_stored = even.collect()  

Dlatego terminy takie jak „bezstanowy” i „funkcjonalny” są bardziej kojarzone z RX


Ale 5 nie jest równe… I wygląda na to, że strumień J8 jest synchroniczny, podczas gdy strumień Rx jest asynchroniczny?
Franklin Yu

1
@FranklinYu dzięki, naprawiłem 5 literówek. Jeśli myślisz mniej w kategoriach synchronicznych vs asynchronicznych, chociaż może to być poprawne, a bardziej w kategoriach trybu rozkazującego vs funkcjonalnego. W J8 najpierw zbierasz wszystkie swoje przedmioty, a następnie stosujesz filtr. W RX definiujesz funkcję filtrującą niezależną od danych, a następnie kojarzysz ją z parzystym źródłem (transmisja na żywo lub kolekcja java) ... to zupełnie inny model programowania
Adam Hughes

Jestem tym bardzo zaskoczony. Jestem prawie pewien, że strumienie Java mogą składać się ze strumieniowego przesyłania danych. Co sprawia, że ​​myślisz odwrotnie?
Vic Seedoubleyew

4

RxJava jest również ściśle związana z inicjatywą strumieni reaktywnych i traktuje ją jako prostą implementację interfejsu API strumieni reaktywnych (np. W porównaniu z implementacją strumieni Akka ). Główna różnica polega na tym, że strumienie reaktywne są zaprojektowane tak, aby były w stanie poradzić sobie z ciśnieniem wstecznym, ale jeśli spojrzysz na stronę strumieni reaktywnych, zobaczysz pomysł. Dość dobrze opisują swoje cele, a strumienie są również ściśle związane z reaktywnym manifestem .

Strumienie Java 8 są w zasadzie implementacją nieograniczonej kolekcji, podobną do Scala Stream lub leniwej sekwencji Clojure .


3

Strumienie Java 8 umożliwiają wydajne przetwarzanie naprawdę dużych kolekcji przy jednoczesnym wykorzystaniu architektur wielordzeniowych. Natomiast RxJava jest domyślnie jednowątkowa (bez harmonogramów). Więc RxJava nie wykorzysta maszyn wielordzeniowych, chyba że sam zaprogramujesz tę logikę.


4
Strumień jest również domyślnie jednowątkowy, chyba że wywołasz .parallel (). Ponadto Rx zapewnia większą kontrolę nad współbieżnością.
Kirill Gamazkov

@KirillGamazkov Kotlin Coroutines Flow (oparty na strumieniach Java8) obsługuje teraz strukturalną współbieżność: kotlinlang.org/docs/reference/coroutines/flow.html#flows
IgorGanapolsky

To prawda, ale nie powiedziałem nic o Flow i współbieżności strukturalnej. Moje dwie uwagi to: 1) zarówno Stream, jak i Rx są jednowątkowe, chyba że wyraźnie to zmienisz; 2) Rx daje ci precyzyjną kontrolę nad tym, który krok ma być wykonywany w jakiej puli nici, w przeciwieństwie do strumieni, które pozwalają tylko powiedzieć „zrób to jakoś równolegle”
Kirill Gamazkov,

Naprawdę nie rozumiem sensu pytania „do czego potrzebujesz puli wątków”. Jak powiedziałeś, „aby umożliwić wydajne przetwarzanie naprawdę dużych kolekcji”. A może chcę, aby część zadania związana z IO była uruchamiana w oddzielnej puli wątków. Myślę, że nie zrozumiałem intencji twojego pytania. Spróbuj ponownie?
Kirill Gamazkov

1
Metody statyczne w klasie Schedulers pozwalają na pobranie predefiniowanych pul wątków, jak również na utworzenie jednej z Executora. Zobacz reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/…
Kirill Gamazkov
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.