Sekwencja RxJS równoważna obietnicy.then ()?


84

Kiedyś dużo się rozwijałem z obietnicą, a teraz przechodzę do RxJS. Dokumentacja RxJS nie dostarcza jasnego przykładu, jak przejść od łańcucha obietnic do sekwencji obserwatorów.

Na przykład zazwyczaj piszę łańcuch obietnic z wieloma krokami, jak np

// a function that returns a promise
getPromise()
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.catch(function(err) {
    // handle error
});

Jak mam przepisać ten łańcuch obietnic w stylu RxJS?

Odpowiedzi:


81

Dla przepływu danych (odpowiednik then):

Rx.Observable.fromPromise(...)
  .flatMap(function(result) {
   // do something
  })
  .flatMap(function(result) {
   // do something
  })
  .subscribe(function onNext(result) {
    // end of chain
  }, function onError(error) {
    // process the error
  });

Obietnicę można przekształcić w obserwowalną za pomocą Rx.Observable.fromPromise.

Niektórzy operatorzy obiecujący mają bezpośrednie tłumaczenie. Na przykład RSVP.alllub jQuery.whenmożna go zastąpić Rx.Observable.forkJoin.

Pamiętaj, że masz kilka operatorów, które pozwalają na asynchroniczne przekształcanie danych i wykonywanie zadań, których nie możesz lub byłoby bardzo trudne do wykonania z obietnicami. Rxjs ujawnia wszystkie swoje moce za pomocą asynchronicznych sekwencji danych (sekwencja tj. Więcej niż 1 asynchroniczna wartość).

W przypadku zarządzania błędami temat jest nieco bardziej złożony.

  • są też haczyki i wreszcie operatorzy
  • retryWhen może również pomóc w powtórzeniu sekwencji w przypadku błędu
  • możesz także poradzić sobie z błędami samego subskrybenta za pomocą onErrorfunkcji.

Aby uzyskać dokładną semantykę, przyjrzyj się dokładniej dokumentacji i przykładom, które można znaleźć w Internecie, lub zadaj szczegółowe pytania tutaj.

Z pewnością byłby to dobry punkt wyjścia do głębszego zarządzania błędami za pomocą Rxjs: https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html


Zawsze widzę, że obserwowalna sekwencja kończy się na subscribe (). Ponieważ jest to tylko funkcja obserwowalnego obiektu, czy jest jakiś powód, aby to robić? Czy to funkcja, aby rozpocząć sekwencję?
Haoliang Yu

dokładnie tak. Jeśli nie ma obserwatorów przekazanych przez subskrypcję, twój obserwowalny nie wyemituje żadnych danych, więc nie zobaczysz żadnego przepływu danych.
user3743222

7
Polecam zajrzeć na to: gist.github.com/staltz/868e7e9bc2a7b8c1f754 . To mogłoby być przyjemniejsze niż oficjalny dokument.
user3743222

3
Promise.thenjest raczej .flatMapniż .map.
Tamas Hegedus

1
FYI nie jest to dokładnie równoważne, ponieważ w Promisewersji błędy z 3. thenzostałyby przechwycone przez catch. Tutaj ich nie ma.
mik01aj

35

Bardziej nowoczesna alternatywa:

import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';

fromPromise(...).pipe(
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   catchError(error => {
       // handle error
   })
)

Zwróć też uwagę, że aby to wszystko działało, musisz gdzieś subscribeto podłączyć Observable, ale zakładam, że jest to obsługiwane w innej części aplikacji.


Jestem bardzo nowy w RxJS, ale biorąc pod uwagę, że mamy tutaj do czynienia tylko z początkowym strumieniem jednego zdarzenia i mergeMap()dlatego tak naprawdę nie ma nic do scalenia , uważam, że moglibyśmy osiągnąć dokładnie to samo w tym przypadku, używając concatMap()lub switchMap(). Czy mam to poprawne ...?
Dan King

8

Zaktualizuj maj 2019, używając RxJs 6

Zgadzam się z odpowiedziami podanymi powyżej, chciałem dodać konkretny przykład z niektórymi danymi zabawek i prostymi obietnicami (z setTimeout) używając RxJs v6, aby zwiększyć przejrzystość.

Po prostu zaktualizuj przekazany identyfikator (obecnie zakodowany na stałe jako 1) do czegoś, co nie istnieje, aby wykonać również logikę obsługi błędów. Co ważne, zwróć także uwagę na użycie ofwith catchErrormessage.

import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";

const posts = [
  { title: "I love JavaScript", author: "Wes Bos", id: 1 },
  { title: "CSS!", author: "Chris Coyier", id: 2 },
  { title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];

const authors = [
  { name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
  {
    name: "Chris Coyier",
    twitter: "@chriscoyier",
    bio: "CSS Tricks and CodePen"
  },
  { name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];

function getPostById(id) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const post = posts.find(post => post.id === id);
      if (post) {
        console.log("ok, post found!");
        resolve(post);
      } else {
        reject(Error("Post not found!"));
      }
    }, 200);
  });
}

function hydrateAuthor(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const authorDetails = authors.find(person => person.name === post.author);
      if (authorDetails) {
        post.author = authorDetails;
        console.log("ok, post hydrated with author info");
        resolve(post);
      } else {
        reject(Error("Author not Found!"));
      }
    }, 200);
  });
}

function dehydratePostTitle(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      delete post.title;
      console.log("ok, applied transformation to remove title");
      resolve(post);
    }, 200);
  });
}

// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
  flatMap(post => {
    return hydrateAuthor(post);
  }),
  flatMap(post => {
    return dehydratePostTitle(post);
  }),
  catchError(error => of(`Caught error: ${error}`))
);

source$.subscribe(console.log);

Dane wyjściowe:

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
   { name: 'Wes Bos',
     twitter: '@wesbos',
     bio: 'Canadian Developer' },
  id: 1 }

Kluczowa część jest równoważna z następującym przy użyciu prostego przepływu kontroli obietnicy:

getPostById(1)
  .then(post => {
    return hydrateAuthor(post);
  })
  .then(post => {
    return dehydratePostTitle(post);
  })
  .then(author => {
    console.log(author);
  })
  .catch(err => {
    console.error(err);
  });

1

Jeśli dobrze zrozumiałem, masz na myśli konsumpcję wartości, w takim przypadku używasz sbuscribe tj

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

Dodatkowo możesz po prostu zmienić to, co obserwowalne, w obietnicę za pomocą toPromise (), jak pokazano:

arrObservable.toPromise().then()

0

jeśli getPromisefunkcja jest w środku rury strumienia zalecana prosty owinąć go do jednej z funkcji mergeMap, switchMaplub concatMap(najczęściej mergeMap):

stream$.pipe(
   mergeMap(data => getPromise(data)),
   filter(...),
   map(...)
 ).subscribe(...);

jeśli chcesz rozpocząć swój strumień, getPromise()zawiń go w fromfunkcję:

import {from} from 'rxjs';

from(getPromise()).pipe(
   filter(...)
   map(...)
).subscribe(...);

0

O ile właśnie się dowiedziałem, jeśli zwracasz wynik w flatMap, konwertuje go na Array, nawet jeśli zwróciłeś ciąg.

Ale jeśli zwrócisz Observable, ten obserowalny może zwrócić ciąg;


0

Tak to zrobiłem.

Poprzednio

  public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
    const request = gapi.client.people.people.connections.list({
      resourceName: 'people/me',
      pageSize: 100,
      personFields: 'phoneNumbers,organizations,emailAddresses,names'
    }).then(response => {
      onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
    });
  }

// caller:

  this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      // handle rsp;
  });

Po (l?)

public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
    return from(
      new Promise((resolve, reject) => {
        gapi.client.people.people.connections.list({
          resourceName: 'people/me',
          pageSize: 100,
          personFields: 'phoneNumbers,organizations,emailAddresses,names'
        }).then(result => {
          resolve(result);
        });
      })
    ).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
    }));
  }

// caller

this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
  // handle rsp
}), (error) => {
  // handle error
});

efekt uboczny : wykrywanie zmian również zaczęło działać po przekształceniu wywołania zwrotnego w obserwowalny .
Anand Rockzz

0

Sekwencja RxJS równoważna obietnicy.then ()?

Na przykład

function getdata1 (argument) {
        return this.http.get(url)
            .map((res: Response) => res.json());
    }

    function getdata2 (argument) {
        return this.http.get(url)
            .map((res: Response) => res.json());
    }

    getdata1.subscribe((data1: any) => {
        console.log("got data one. get data 2 now");
        getdata2.subscribe((data2: any) => {
            console.log("got data one and two here");
        });
    });
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.