Połącz listę Observables i poczekaj, aż wszystko zostanie ukończone


91

TL; DR Jak przekonwertować Task.whenAll(List<Task>)na RxJava?

Mój istniejący kod używa Bolts do tworzenia listy zadań asynchronicznych i czeka, aż wszystkie te zadania zostaną zakończone, zanim wykonają inne kroki. Zasadniczo tworzy List<Task>i zwraca pojedynczy znak, Taskktóry jest oznaczony jako zakończony, gdy wszystkie zadania na liście zostaną ukończone, jak na przykładzie w witrynie Bolts .

Szukam zastąpić Boltsz RxJavaa ja zakładając tę metodę budowania listę zadań asynchronicznych (rozmiar nie wiadomo z góry) i owijając je wszystkie w jeden Observablejest możliwe, ale nie wiem jak.

Próbowałem patrząc na merge, zip, concatetc ... ale nie może dostać się do pracy, na List<Observable>które będę budowania jak wszystkie one wydają dostosowane do pracy na dwóch Observablesnaraz, jeśli dobrze rozumiem docs.

Próbuję się uczyć RxJavai nadal jestem bardzo nowy w tym, więc wybacz mi, jeśli to jest oczywiste pytanie lub wyjaśnione gdzieś w dokumentach; Próbowałem szukać. Każda pomoc byłaby mile widziana.

Odpowiedzi:


73

Wygląda na to, że szukasz operatora Zip .

Można go używać na kilka różnych sposobów, więc spójrzmy na przykład. Powiedzmy, że mamy kilka prostych obserwacji różnych typów:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

Najprostszym sposobem, aby poczekać na nich wszystkich, jest coś takiego:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Zauważ, że w funkcji zip parametry mają konkretne typy, które odpowiadają typom spakowanych obserwabli.

Spakowanie listy obserwabli jest również możliwe, albo bezpośrednio:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

... lub zawijając listę w Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

Jednak w obu tych przypadkach funkcja zip może zaakceptować tylko jeden Object[]parametr, ponieważ typy obserwowalnych na liście nie są znane z góry, a także ich liczba. Oznacza to, że funkcja zip musiałaby sprawdzić liczbę parametrów i odpowiednio je przesłać.

Niezależnie od tego, wszystkie powyższe przykłady zostaną ostatecznie wydrukowane 1 Blah true

EDYCJA: Korzystając z Zip, upewnij się, że Observableswszystkie spakowane pliki emitują taką samą liczbę elementów. W powyższych przykładach wszystkie trzy obserwable emitowały pojedynczą pozycję. Gdybyśmy mieli je zmienić na coś takiego:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Wtedy 1, Blah, Truei 2, Hello, Truebyłyby jedynymi elementami przekazanymi do funkcji zip. Element 3nigdy nie zostałby spakowany, ponieważ inne obserwowalne zostały zakończone.


9
To nie zadziała, jeśli jedno z połączeń zakończy się niepowodzeniem. W takim przypadku wszystkie połączenia zostaną utracone.
StarWind0

1
@ StarWind0 możesz pominąć błąd przez użycie onErrorResumeNext, przykład:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
vuhung3990

A jeśli mam 100 obserwabli?
Krzysztof Kubicki

79

Możesz użyć flatMapw przypadku, gdy masz dynamiczną kompozycję zadań. Coś takiego:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplemete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Kolejny dobry przykład wykonywania równoległego

Uwaga: tak naprawdę nie znam Twoich wymagań dotyczących obsługi błędów. Na przykład co zrobić, jeśli tylko jedno zadanie się nie powiedzie. Myślę, że powinieneś zweryfikować ten scenariusz.


16
To powinna być akceptowana odpowiedź, biorąc pod uwagę, że pytanie brzmi: „kiedy wszystkie zadania z listy zostaną ukończone”. zippowiadamia o zakończeniu, gdy tylko jedno z zadań zostało wykonane i tym samym nie ma zastosowania.
user3707125

1
@MyDogTom: Czy możesz zaktualizować odpowiedź za pomocą wersji składni Java7 (nie lambda)?
sanedroid

3
@PoojaGaikwad Dzięki lambdzie jest bardziej czytelny. Wystarczy zamienić pierwszą new Func1<Observable<Boolean>, Observable<Boolean>>()...new Func1<List<Boolean>, Boolean>()
lambdę na,

@soshial RxJava 2 to najgorsza rzecz, jaka kiedykolwiek przydarzyła się RxJava, tak
egorikem

15

Spośród proponowanych sugestii zip () w rzeczywistości łączy ze sobą obserwowalne wyniki, które mogą być pożądane lub nie, ale nie zostały zadane w pytaniu. W pytaniu chodziło tylko o wykonanie każdej z operacji, pojedynczo lub równolegle (co nie zostało określone, ale połączony przykład Boltsa dotyczył wykonywania równoległego). Ponadto, zip () zakończy się natychmiast po ukończeniu któregokolwiek z obserwacji, więc narusza wymagania.

W przypadku równoległego wykonywania Observables, flatMap () przedstawiona w drugiej odpowiedzi jest w porządku, ale metoda merge () byłaby prostsza . Zauważ, że scalanie zakończy się po błędzie któregokolwiek z Observables, jeśli wolisz odłożyć zakończenie do zakończenia wszystkich obserwabli, powinieneś spojrzeć na mergeDelayError () .

Myślę, że w przypadku jednego po drugim należy użyć metody statycznej Observable.concat () . Jego javadoc stwierdza w ten sposób:

concat (java.lang.Iterable> sequences) Spłaszcza Iterable of Observables w jedną Observable, jedną po drugiej, bez przeplatania ich

co brzmi jak to, czego szukasz, jeśli nie chcesz równoległego wykonywania.

Ponadto, jeśli interesuje Cię tylko wykonanie zadania, a nie zwracanie wartości, prawdopodobnie powinieneś zajrzeć do Completable zamiast Observable .

TLDR: myślę, że do wykonywania zadań jeden po drugim i zdarzenia oncompletion po ich zakończeniu, najlepiej nadaje się Completable.concat (). W przypadku wykonywania równoległego Completable.merge () lub Completable.mergeDelayError () brzmi jak rozwiązanie. Pierwsza z nich zatrzyma się natychmiast po wystąpieniu błędu na jakimkolwiek możliwym do ukończenia, druga wykona je wszystkie, nawet jeśli jeden z nich ma błąd, i dopiero wtedy zgłosi błąd.


2

Prawdopodobnie spojrzałeś na zipoperator, który działa z 2 Observables.

Istnieje również metoda statyczna Observable.zip. Ma jedną formę, która powinna Ci się przydać:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

Możesz sprawdzić javadoc, aby uzyskać więcej.


2

Z Kotlinem

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->

})

Ważne jest, aby ustawić typ argumentów funkcji, w przeciwnym razie wystąpią błędy kompilacji

Ostatni typ argumentu zmienia się wraz z liczbą argumentów: BiFunction dla 2 Funkcja3 dla 3 Funkcja4 dla 4 ...


1

Piszę trochę kodu obliczeniowego w Kotlin z JavaRx Observables i RxKotlin. Chcę obserwować listę obserwowalnych do uzupełnienia, a w międzyczasie informować mnie o postępach i najnowszych wynikach. Na koniec zwraca najlepszy wynik obliczeń. Dodatkowym wymaganiem było równoległe uruchomienie Observables w celu wykorzystania wszystkich moich rdzeni procesora. Skończyło się na tym rozwiązaniu:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {

    return Observable.create { subscriber ->
        Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        }).subscribeBy(
            onNext = {
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            },
            onComplete = {
                subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) 
                subscriber.onComplete()
            },
            onError = {
                subscriber.onError(it)
            }
        )
    }
}

nie znasz RxKotlin lub @Volatile, ale jak by to działało, gdyby było wywoływane przez kilka wątków w tym samym czasie? Co by się stało z wynikami?
eis

0

Miałem podobny problem, musiałem pobrać elementy wyszukiwania z rozmowy resztowej, a także zintegrować zapisane sugestie z RecentSearchProvider.AUTHORITY i połączyć je razem w jedną ujednoliconą listę. Próbowałem użyć rozwiązania @MyDogTom, niestety nie ma Observable.from w RxJava. Po kilku badaniach uzyskałem rozwiązanie, które działało dla mnie.

 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
    val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
    fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
    fetchedItems.add(getSearchResults(query).toObservable())

    return Observable.fromArray(fetchedItems)
        .flatMapIterable { data->data }
        .flatMap {task -> task.observeOn(Schedulers.io())}
        .toList()
        .map { ArrayList(it) }
}

Stworzyłem obserwowalny z tablicy obserwabli, która zawiera listy sugestii i wyników z Internetu w zależności od zapytania. Następnie wystarczy przejrzeć te zadania za pomocą flatMapIterable i uruchomić je za pomocą flatmap, a wyniki umieścić w tablicy, którą można później pobrać do widoku recyklingu.


0

Jeśli używasz programu Project Reactor, możesz użyć Mono.when.

Mono.when(publisher1, publisher2)
.map(i-> {
    System.out.println("everything is done!");
    return i;
}).block()
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.