Patrzyłem na nową rx java 2 i nie jestem do końca pewien, czy rozumiem już ideę backpressure
...
Zdaję sobie sprawę, że my Observable
nie mamy backpressure
wsparcia i Flowable
to je ma.
Więc na podstawie przykładu powiedzmy, że mam flowable
z interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
To się zawiesza po około 128 wartościach i jest to całkiem oczywiste, że konsumuję wolniej niż zdobywanie przedmiotów.
Ale mamy to samo z Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
To wcale się nie zawiesi, nawet jeśli opóźnię spożycie, nadal działa. Aby Flowable
pracować, powiedzmy, że wstawię onBackpressureDrop
operator, awaria zniknęła, ale nie wszystkie wartości są również emitowane.
Więc podstawowe pytanie, na które nie mogę znaleźć odpowiedzi w mojej głowie, brzmi: dlaczego mam się przejmować, backpressure
kiedy mogę używać zwykłego Observable
nadal odbierać wszystkie wartości bez zarządzania buffer
? A może z drugiej strony, jakie korzyści backpressure
dają mi korzyści z zarządzania i obsługi konsumpcji?