Jaki jest prawidłowy sposób udostępniania wyniku połączenia sieciowego Angular Http w RxJs 5?


303

Korzystając z protokołu HTTP, wywołujemy metodę, która wykonuje połączenie sieciowe i zwraca obserwowalny http:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

Jeśli weźmiemy to pod uwagę i dodamy do niej wielu subskrybentów:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

Chcemy mieć pewność, że nie spowoduje to wielu żądań sieciowych.

Może się to wydawać niezwykłym scenariuszem, ale jest tak naprawdę dość powszechny: na przykład, jeśli wywołujący subskrybuje obserwowalny, aby wyświetlić komunikat o błędzie i przekazuje go do szablonu za pomocą potoku asynchronicznego, mamy już dwóch subskrybentów.

Jak to zrobić w RxJs 5?

Mianowicie wydaje się to działać dobrze:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

Ale czy jest to idiomatyczny sposób robienia tego w RxJs 5, czy zamiast tego powinniśmy zrobić coś innego?

Uwaga: Zgodnie z kątowym 5 Nowy HttpClientThe .map(res => res.json())część we wszystkich przykładach jest teraz bezużyteczne, jako wynik JSON jest teraz zakłada domyślnie.


1
> udział jest identyczny jak opublikuj (). refCount (). Właściwie to nie jest. Zobacz następującą dyskusję: github.com/ReactiveX/rxjs/issues/1363
Christian

1
edytowane pytanie, zgodnie z problemem wygląda na to, że dokumenty w kodzie muszą zostać zaktualizowane -> github.com/ReactiveX/rxjs/blob/master/src/operator/share.ts
Angular University

Myślę, że „to zależy”. Ale w przypadku wywołań, w których nie można buforować danych lokalnie b / c, może to nie mieć sensu ze względu na zmianę parametrów / kombinacji .share () wydaje się absolutnie właściwą rzeczą. Ale jeśli możesz buforować rzeczy lokalnie, niektóre inne odpowiedzi dotyczące ReplaySubject / BehaviorSubject są również dobrym rozwiązaniem.
JimB

Myślę, że nie tylko potrzebujemy buforować dane, ale także aktualizować / modyfikować dane buforowane. To częsty przypadek. Na przykład, jeśli chcę dodać nowe pole do modelu buforowanego lub zaktualizować wartość pola. Może utworzyć Singleton DataCacheService z CRUD metody jest lepszy sposób? Jak sklep z Redux . Co myślisz?
slideshowp2

Możesz po prostu użyć pamięci podręcznej ngx ! To lepiej pasuje do twojego scenariusza. Zobacz moją odpowiedź poniżej
Tushar Walzade,

Odpowiedzi:


230

Buforuj dane i jeśli są dostępne w pamięci podręcznej, zwróć to, w przeciwnym razie prześlij żądanie HTTP.

import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';

@Injectable()
export class DataService {
  private url: string = 'https://cors-test.appspot.com/test';

  private data: Data;
  private observable: Observable<any>;

  constructor(private http: Http) {}

  getData() {
    if(this.data) {
      // if `data` is available just return it as `Observable`
      return Observable.of(this.data); 
    } else if(this.observable) {
      // if `this.observable` is set then the request is in progress
      // return the `Observable` for the ongoing request
      return this.observable;
    } else {
      // example header (not necessary)
      let headers = new Headers();
      headers.append('Content-Type', 'application/json');
      // create the request, store the `Observable` for subsequent subscribers
      this.observable = this.http.get(this.url, {
        headers: headers
      })
      .map(response =>  {
        // when the cached data is available we don't need the `Observable` reference anymore
        this.observable = null;

        if(response.status == 400) {
          return "FAILURE";
        } else if(response.status == 200) {
          this.data = new Data(response.json());
          return this.data;
        }
        // make it shared so more than one subscriber can get the result
      })
      .share();
      return this.observable;
    }
  }
}

Przykład plunkera

Ten artykuł https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html to świetne wyjaśnienie dotyczące sposobu buforowania shareReplay.


3
do()w przeciwieństwie do map()nie modyfikuje zdarzenia. Możesz również użyć, map()ale musisz upewnić się, że na końcu wywołania zwrotnego zwracana jest poprawna wartość.
Günter Zöchbauer

3
Jeśli strona wywołująca, która robi .subscribe()to, nie potrzebuje wartości, możesz to zrobić, ponieważ może się ona po prostu dostać null(w zależności od tego, co this.extractDatazwraca), ale IMHO nie wyraża dobrze intencji kodu.
Günter Zöchbauer

2
Kiedy this.extraDatakończy się extraData() { if(foo) { doSomething();}}inaczej, zwracany jest wynik ostatniego wyrażenia, które może nie być tym, czego chcesz.
Günter Zöchbauer

9
@ Günter, dziękuję za kod, działa. Staram się jednak zrozumieć, dlaczego osobno śledzisz dane i można je obserwować. Czy nie osiągnąłbyś skutecznie tego samego efektu, buforując tylko Obserwowalne <Data> w ten sposób? if (this.observable) { return this.observable; } else { this.observable = this.http.get(url) .map(res => res.json().data); return this.observable; }
Lipiec. Tech

3
@HarleenKaur Jest to klasa, do której otrzymany JSON jest przekształcany w postaci szeregowej w celu uzyskania silnego sprawdzania typu i autouzupełniania. Nie trzeba go używać, ale jest to powszechne.
Günter Zöchbauer

44

Według sugestii @Cristian jest to jeden ze sposobów, który działa dobrze dla obserwowalnych HTTP, które emitują tylko raz, a następnie wykonują:

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}

Istnieje kilka problemów z użyciem tego podejścia - zwróconego obserwowalnego nie można anulować ani ponowić. To może nie być dla ciebie problem, ale z drugiej strony może. Jeśli jest to problem, shareoperator może być rozsądnym wyborem (choć w przypadku niektórych nieprzyjemnych przypadków krawędziowych). Dla głębokiego nurkowania dyskusji na temat opcji zobaczyć komentarze sekcję w tym blogu: blog.jhades.org/...
Christian

1
Małe wyjaśnienie ... Mimo że publishLast().refCount()nie można anulować udostępniania obserwowalnego źródła, nie można go anulować, ale po anulowaniu wszystkich subskrypcji zwróconego obserwowalnego refCount, efektem netto jest obserwowanie źródła, które nie będzie subskrybowane, anulując je, jeśli to w miejscu „inflight”
Christian

@Christian Hej, czy możesz wyjaśnić, co masz na myśli mówiąc „nie można anulować ani spróbować ponownie”? Dzięki.
niezdefiniowany

37

AKTUALIZACJA: Ben Lesh mówi, że w następnej wersji mniejszej po 5.2.0 będziesz mógł po prostu wywołać shareReplay (), aby naprawdę buforować.

POPRZEDNIO.....

Po pierwsze, nie używaj share () ani publiceplay (1) .refCount (), są one takie same, a problemem jest to, że udostępnia tylko wtedy, gdy połączenia są nawiązywane, gdy obserwowalne jest aktywne, jeśli łączysz się po jego zakończeniu. , ponownie tworzy nowy obserwowalny, tłumaczenie, a nie buforowanie.

Birowski podał powyższe właściwe rozwiązanie, którym jest użycie ReplaySubject. ReplaySubject zbuforuje wartości, które mu podasz (bufferSize) w naszym przypadku 1. Nie utworzy nowego obserwowalnego, takiego jak share (), gdy refCount osiągnie zero i nawiążesz nowe połączenie, co jest właściwym zachowaniem do buforowania.

Oto funkcja wielokrotnego użytku

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}

Oto jak z niego korzystać

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}

Poniżej znajduje się bardziej zaawansowana wersja funkcji pamięci podręcznej. Ta umożliwia własną tabelę wyszukiwania + możliwość zapewnienia niestandardowej tabeli wyszukiwania. W ten sposób nie musisz sprawdzać this._cache jak w powyższym przykładzie. Zauważ również, że zamiast przekazywać obserwowalny jako pierwszy argument, przekazujesz funkcję, która zwraca obserwowalne, to dlatego, że Http Angulara wykonuje się od razu, więc zwracając leniwie wykonaną funkcję, możemy zdecydować, aby nie wywoływać jej, jeśli jest już w nasza pamięć podręczna.

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}

Stosowanie:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")

Czy jest jakiś powód, aby nie korzystać z tego rozwiązania jako operator RxJs: const data$ = this._http.get('url').pipe(cacheable()); /*1st subscribe*/ data$.subscribe(); /*2nd subscribe*/ data$.subscribe();? Zachowuje się więc bardziej jak każdy inny operator ...
Felix

31

rxjs 5.4.0 ma nową metodę shareReplay .

Autor wyraźnie mówi „idealny do obsługi takich rzeczy, jak buforowanie wyników AJAX”

rxjs PR # 2443 feat (shareReplay): dodaje shareReplaywariantpublishReplay

shareReplay zwraca obserwowalny, który jest źródłem multiemisji na ReplaySubject. Ten temat powtórki jest przetwarzany ponownie w przypadku błędu ze źródła, ale nie po zakończeniu źródła. To sprawia, że ​​shareReplay jest idealny do obsługi takich zadań, jak buforowanie wyników AJAX, ponieważ można je ponowić. Jego zachowanie powtarzalne różni się jednak od udziału tym, że nie powtórzy obserwowalnego źródła, a raczej powtórzy wartości obserwowalnego źródła.


Czy to się z tym wiąże? Te dokumenty pochodzą jednak z 2014 roku. github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/…
Aaron Hoffman

4
Próbowałem dodać .shareReplay (1, 10000) do obserwowalnego, ale nie zauważyłem żadnej zmiany pamięci podręcznej ani zmiany zachowania. Czy jest dostępny działający przykład?
Aydus-Matthew

Patrząc na dziennik zmian github.com/ReactiveX/rxjs/blob/... Pojawił się wcześniej, został usunięty w wersji 5, dodany ponownie w wersji 5.4 - ten link rx-book odnosi się do wersji 4, ale istnieje w aktualnej wersji LTS w wersji 5.5.6 jest w wersji 6. Wyobrażam sobie, że link rx-book jest nieaktualny.
Jason Awbrey,

25

zgodnie z tym artykułem

Okazuje się, że możemy z łatwością dodać buforowanie do obserwowalnego, dodając publikację PublReplay (1) i refCount.

więc w środku, jeśli po prostu dołączą instrukcje

.publishReplay(1)
.refCount();

do .map(...)


11

Wersja rxjs 5.4.0 (2017-05-09) dodaje obsługę shareReplay .

Dlaczego warto korzystać z shareReplay?

Zasadniczo chcesz korzystać z shareReplay, gdy masz skutki uboczne lub opodatkowanie obliczeń, których nie chcesz wykonywać wśród wielu subskrybentów. Może być również cenny w sytuacjach, gdy wiesz, że będziesz mieć późnych subskrybentów strumienia, który potrzebuje dostępu do wcześniej emitowanych wartości. Ta zdolność do odtwarzania wartości w subskrypcji jest tym, co odróżnia udostępnianie i udostępnianie.

Możesz łatwo zmodyfikować usługę kątową, aby z niej skorzystać i zwrócić obserwowalny wynik z pamięci podręcznej, który spowoduje, że połączenie http zostanie wykonane tylko raz (zakładając, że pierwsze połączenie zakończyło się powodzeniem).

Przykładowa usługa kątowa

Oto bardzo prosta obsługa klienta, z której korzysta shareReplay.

customer.service.ts

import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Injectable()
export class CustomerService {

    private readonly _getCustomers: Observable<ICustomer[]>;

    constructor(private readonly http: HttpClient) {
        this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
    }

    getCustomers() : Observable<ICustomer[]> {
        return this._getCustomers;
    }
}

export interface ICustomer {
  /* ICustomer interface fields defined here */
}

Zauważ, że przypisanie do konstruktora można przenieść do metody, getCustomersale ponieważ obserwowalne elementy zwracane HttpClientsą „zimne”, robienie tego w konstruktorze jest dopuszczalne, ponieważ wywołanie http zostanie wykonane tylko za pierwszym razem subscribe.

Zakłada się tutaj również, że początkowe zwrócone dane nie stają się przestarzałe w czasie życia instancji aplikacji.


Bardzo podoba mi się ten wzorzec i chcę go wdrożyć we wspólnej bibliotece usług interfejsu API, z których korzystam w wielu aplikacjach. Jednym z przykładów jest UserService i wszędzie oprócz kilku miejsc nie trzeba unieważniać pamięci podręcznej w trakcie życia aplikacji, ale w takich przypadkach, jak mam ją unieważnić bez powodowania osierocenia poprzednich subskrypcji?
SirTophamHatt

10

Zagrałem gwiazdką w pytanie, ale spróbuję spróbować.

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

Oto dowód :)

Jest tylko jedna na wynos: getCustomer().subscribe(customer$)

Nie subskrybujemy odpowiedzi interfejsu API getCustomer(), subskrybujemy replaySubject, który jest możliwy do zaobserwowania, który może również subskrybować inny obserwowalny i (i to jest ważne) utrzymać ostatnią emitowaną wartość i ponownie opublikować dowolną z nich (ReplaySubject's ) subskrybentów.


1
Podoba mi się to podejście, ponieważ dobrze wykorzystuje rxjs i nie trzeba dodawać niestandardowej logiki, dziękuję
Thibs

7

Znalazłem sposób na przechowanie wyniku HTTP get w sessionStorage i wykorzystanie go do sesji, aby nigdy więcej nie zadzwonił do serwera.

Użyłem go do wywołania API github, aby uniknąć ograniczenia użycia.

@Injectable()
export class HttpCache {
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    let cached: any;
    if (cached === sessionStorage.getItem(url)) {
      return Observable.of(JSON.parse(cached));
    } else {
      return this.http.get(url)
        .map(resp => {
          sessionStorage.setItem(url, resp.text());
          return resp.json();
        });
    }
  }
}

Do Twojej wiadomości limit sessionStorage to 5M (lub 4,75M). Dlatego nie należy go tak używać w przypadku dużego zestawu danych.

------ edytuj -------------
Jeśli chcesz odświeżyć dane za pomocą F5, który wykorzystuje dane pamięci zamiast sessionStorage;

@Injectable()
export class HttpCache {
  cached: any = {};  // this will store data
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    if (this.cached[url]) {
      return Observable.of(this.cached[url]));
    } else {
      return this.http.get(url)
        .map(resp => {
          this.cached[url] = resp.text();
          return resp.json();
        });
    }
  }
}

Jeśli będziesz przechowywać w pamięci sesji, to w jaki sposób upewnisz się, że pamięć sesji zostanie zniszczona po opuszczeniu aplikacji?
Gags

ale wprowadza to nieoczekiwane zachowanie dla użytkownika. Gdy użytkownik naciśnie klawisz F5 lub przycisk odświeżania przeglądarki, oczekuje nowych danych z serwera. Ale tak naprawdę dostaje przestarzałe dane z localStorage. Raporty o błędach, zgłoszenia do pomocy technicznej itp. Przychodzące ... Jak sama nazwa sessionStoragewskazuje, użyłbym tego tylko dla danych, które powinny być spójne dla całej sesji.
Martin Schneider

@ MA-Maddin, jak powiedziałem: „Użyłem go, aby uniknąć limitu użytkowania”. Jeśli chcesz, aby dane były odświeżane za pomocą F5, musisz użyć pamięci zamiast sessionStorage. Odpowiedź została zredagowana przy użyciu tego podejścia.
allenhwkim

tak, to może być przypadek użycia. Właśnie mnie uruchomiono, ponieważ wszyscy mówią o pamięci podręcznej, a OP ma getCustomerw swoim przykładzie. ;) Więc chciałem tylko ostrzec ppl, że może nie dostrzegać ryzyka :)
Martin Schneider

5

Wybrana implementacja będzie zależała od tego, czy chcesz anulować subskrypcję (), aby anulować żądanie HTTP, czy nie.

W każdym razie dekoratory TypeScript są dobrym sposobem na ustandaryzowanie zachowania. Oto ten, który napisałem:

  @CacheObservableArgsKey
  getMyThing(id: string): Observable<any> {
    return this.http.get('things/'+id);
  }

Definicja dekoratora:

/**
 * Decorator that replays and connects to the Observable returned from the function.
 * Caches the result using all arguments to form a key.
 * @param target
 * @param name
 * @param descriptor
 * @returns {PropertyDescriptor}
 */
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
  const originalFunc = descriptor.value;
  const cacheMap = new Map<string, any>();
  descriptor.value = function(this: any, ...args: any[]): any {
    const key = args.join('::');

    let returnValue = cacheMap.get(key);
    if (returnValue !== undefined) {
      console.log(`${name} cache-hit ${key}`, returnValue);
      return returnValue;
    }

    returnValue = originalFunc.apply(this, args);
    console.log(`${name} cache-miss ${key} new`, returnValue);
    if (returnValue instanceof Observable) {
      returnValue = returnValue.publishReplay(1);
      returnValue.connect();
    }
    else {
      console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
    }
    cacheMap.set(key, returnValue);
    return returnValue;
  };

  return descriptor;
}

Cześć @Arlo - powyższy przykład się nie kompiluje. Property 'connect' does not exist on type '{}'.z linii returnValue.connect();. Czy możesz rozwinąć?
Kopyto,

4

Dane odpowiedzi HTTP w pamięci podręcznej przy użyciu Rxjs Observer / Observable + Caching + Subskrypcja

Zobacz kod poniżej

* zastrzeżenie: Jestem nowy w rxjs, więc pamiętaj, że mogę niewłaściwie używać podejścia obserwowalnego / obserwatora. Moje rozwiązanie jest wyłącznie konglomeratem innych rozwiązań, które znalazłem, i jest konsekwencją tego, że nie znalazłem prostego, dobrze udokumentowanego rozwiązania. W ten sposób dostarczam moje kompletne rozwiązanie kodu (jak chciałbym to znaleźć) w nadziei, że pomoże innym.

* Uwaga, to podejście jest luźno oparte na GoogleFirebaseObservables. Niestety brakuje mi odpowiedniego doświadczenia / czasu, aby powtórzyć to, co zrobili pod maską. Ale poniżej przedstawiono uproszczony sposób zapewnienia asynchronicznego dostępu do niektórych danych, które mogą być buforowane.

Sytuacja : Zadaniem komponentu „lista produktów” jest wyświetlanie listy produktów. Witryna to jednostronna aplikacja internetowa z kilkoma przyciskami menu, które będą „filtrować” produkty wyświetlane na stronie.

Rozwiązanie : składnik „subskrybuje” metodę usługi. Metoda usługi zwraca tablicę obiektów produktu, do których komponent uzyskuje dostęp poprzez wywołanie zwrotne subskrypcji. Metoda usługi zawija swoją aktywność w nowo utworzonym Observer i zwraca obserwatora. Wewnątrz tego obserwatora wyszukuje dane w pamięci podręcznej i przekazuje je subskrybentowi (komponentowi) i zwraca. W przeciwnym razie wywołuje połączenie http w celu pobrania danych, subskrybuje odpowiedź, w której można przetworzyć te dane (np. Zmapować dane do własnego modelu), a następnie przekazać dane subskrybentowi.

Kod

product-list.component.ts

import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';

@Component({
  selector: 'app-product-list',
  templateUrl: './product-list.component.html',
  styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
  products: Product[];

  constructor(
    private productService: ProductService
  ) { }

  ngOnInit() {
    console.log('product-list init...');
    this.productService.getProducts().subscribe(products => {
      console.log('product-list received updated products');
      this.products = products;
    });
  }
}

product.service.ts

import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';

@Injectable()
export class ProductService {
  products: Product[];

  constructor(
    private http:Http
  ) {
    console.log('product service init.  calling http to get products...');

  }

  getProducts():Observable<Product[]>{
    //wrap getProducts around an Observable to make it async.
    let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
      //return products if it was previously fetched
      if(this.products){
        console.log('## returning existing products');
        observer.next(this.products);
        return observer.complete();

      }
      //Fetch products from REST API
      console.log('** products do not yet exist; fetching from rest api...');
      let headers = new Headers();
      this.http.get('http://localhost:3000/products/',  {headers: headers})
      .map(res => res.json()).subscribe((response:ProductResponse) => {
        console.log('productResponse: ', response);
        let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
        this.products = productlist;
        observer.next(productlist);
      });
    }); 
    return productsObservable$;
  }
}

product.ts (model)

export interface ProductResponse {
  success: boolean;
  msg: string;
  products: Product[];
}

export class Product {
  product_id: number;
  sku: string;
  product_title: string;
  ..etc...

  constructor(product_id: number,
    sku: string,
    product_title: string,
    ...etc...
  ){
    //typescript will not autoassign the formal parameters to related properties for exported classes.
    this.product_id = product_id;
    this.sku = sku;
    this.product_title = product_title;
    ...etc...
  }



  //Class method to convert products within http response to pure array of Product objects.
  //Caller: product.service:getProducts()
  static fromJsonList(products:any): Product[] {
    let mappedArray = products.map(Product.fromJson);
    return mappedArray;
  }

  //add more parameters depending on your database entries and constructor
  static fromJson({ 
      product_id,
      sku,
      product_title,
      ...etc...
  }): Product {
    return new Product(
      product_id,
      sku,
      product_title,
      ...etc...
    );
  }
}

Oto próbka danych wyjściowych, które widzę, gdy ładuję stronę w Chrome. Zauważ, że przy początkowym ładowaniu produkty są pobierane z http (połączenie z moją usługą odpoczynku węzła, która działa lokalnie na porcie 3000). Kiedy następnie klikam, aby przejść do widoku „filtrowanego” produktów, produkty znajdują się w pamięci podręcznej.

Mój dziennik Chrome (konsola):

core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init.  calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse:  {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products

... [kliknął przycisk menu, aby przefiltrować produkty] ...

app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products

Wniosek: Jest to najprostszy (jak dotąd) sposób zaimplementowania buforowanych danych odpowiedzi HTTP. W mojej aplikacji kątowej za każdym razem, gdy przechodzę do innego widoku produktów, składnik listy produktów ładuje się ponownie. ProductService wydaje się być współużytkowanym wystąpieniem, więc lokalna pamięć podręczna „products: Product []” w ProductService jest zachowywana podczas nawigacji, a kolejne wywołania „GetProducts ()” zwracają buforowaną wartość. I ostatnia uwaga, przeczytałem komentarze o tym, jak obserwowalne / subskrypcje muszą być zamknięte, kiedy skończysz, aby zapobiec „wyciekom pamięci”. Nie zamieściłem tego tutaj, ale warto o tym pamiętać.


1
Uwaga - od tego czasu znalazłem bardziej wydajne rozwiązanie, obejmujące RxJS BehaviorSubjects, które upraszcza kod i radykalnie zmniejsza „koszty ogólne”. W products.service.ts: 1. import {BehaviorSubject} z 'rxjs'; 2. zamień „produkty: Produkt []” na „produkt $: BehaviorSubject <Produkt []> = nowy BehaviorSubject <Produkt []> ([]);” 3. Teraz możesz po prostu zadzwonić na http bez zwracania czegokolwiek. http_getProducts () {this.http.get (...). map (res => res.json ()). subscribe (products => this.product $ .next (products))};
ObjectiveTC

1
Zmienna lokalna „produkt $” jest zachowaniem Podmiot, który będzie zarówno EMITOWAĆ, jak i PRZECHOWYWAĆ najnowsze produkty (z produktu $ .next (..) wywołanie w części 3). Teraz w swoich komponentach wstrzyknij usługę jak zwykle. Otrzymujesz ostatnio przypisaną wartość produktu $ za pomocą productService.product $ .value. Lub zasubskrybuj produkt $, jeśli chcesz wykonać akcję za każdym razem, gdy produkt $ otrzyma nową wartość (tj. Funkcja produktu $ .next (...) jest wywołana w części 3).
ObjectiveTC

1
Np. W products.component.ts ... this.productService.product $ .takeUntil (this.ngUnsubscribe) .subscribe ((products) => {this.category); niech filterProducts = this.productService.getProductsByCategory (this.category); this.products = filterProducts; });
ObjectiveTC

1
Ważna uwaga na temat rezygnacji z subskrypcji obserwowalnych: „.takeUntil (this.ngUnsubscribe)”. Zobacz pytanie / odpowiedź o przepełnieniu stosu, które wydaje się pokazywać zalecany „de facto” sposób wypisania się z wydarzeń: stackoverflow.com/questions/38008334/…
ObjectiveTC

1
Alternatywą jest .first () lub .take (1), jeśli obserwowalne ma tylko jeden raz odbierać dane. Wszystkie inne „nieskończone strumienie” obserwowalnych powinny być wypisane z „ngOnDestroy ()”, a jeśli tego nie zrobisz, możesz skończyć z podwójnymi „obserwowalnymi” wywołaniami zwrotnymi. stackoverflow.com/questions/28007777/…
ObjectiveTC

3

Zakładam, że @ ngx-cache / core może być użyteczny do utrzymania funkcji buforowania dla połączeń HTTP, szczególnie jeśli połączenie HTTP jest wykonywane zarówno w przeglądarce, jak i na serwerze platformie .

Powiedzmy, że mamy następującą metodę:

getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

Można użyć Cacheddekorator z @ NGX-cache / rdzeń do przechowywania wartości zwracane z metody podejmowania połączenia HTTP Pod cache storage( może być konfigurowalny, proszę sprawdzić wdrażania na NG-nasiennej / uniwersalny ) - Prawo w pierwszym wykonaniu. Przy następnym wywołaniu metody (bez względu na platformę przeglądarki lub serwera ) wartość jest pobierana z .storagecache storage

import { Cached } from '@ngx-cache/core';

...

@Cached('get-customer') // the cache key/identifier
getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

Istnieje również możliwość stosowanie metod buforowania ( has, get, set) z wykorzystaniem API buforowania .

anyclass.ts

...
import { CacheService } from '@ngx-cache/core';

@Injectable()
export class AnyClass {
  constructor(private readonly cache: CacheService) {
    // note that CacheService is injected into a private property of AnyClass
  }

  // will retrieve 'some string value'
  getSomeStringValue(): string {
    if (this.cache.has('some-string'))
      return this.cache.get('some-string');

    this.cache.set('some-string', 'some string value');
    return 'some string value';
  }
}

Oto lista pakietów, zarówno do buforowania po stronie klienta, jak i serwera:


1

rxjs 5.3.0

Nie byłem zadowolony .map(myFunction).publishReplay(1).refCount()

W przypadku wielu subskrybentów w niektórych przypadkach .map()wykonuje się myFunctiondwukrotnie (spodziewam się, że wykona się tylko raz). Jedna poprawka wydaje się byćpublishReplay(1).refCount().take(1)

Inną rzeczą, którą możesz zrobić, to po prostu nie używać refCount()i natychmiast sprawić, by Observable był gorący:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

Spowoduje to uruchomienie żądania HTTP niezależnie od subskrybentów. Nie jestem pewien, czy anulowanie subskrypcji przed zakończeniem HTTP GET ją anuluje, czy nie.


1

Chcemy mieć pewność, że nie spowoduje to wielu żądań sieciowych.

Moim osobistym ulubionym jest korzystanie z asyncmetod połączeń, które wysyłają żądania sieciowe. Same metody nie zwracają wartości, zamiast tego aktualizująBehaviorSubject usługę ramach tej samej usługi, którą subskrybują komponenty.

Teraz po co używać BehaviorSubjectzamiast zamiast Observable? Ponieważ,

  • Po subskrypcji BehaviorSubject zwraca ostatnią wartość, podczas gdy normalny obserwowalny wyzwala się tylko, gdy otrzyma wartość onnext .
  • Jeśli chcesz pobrać ostatnią wartość BehaviorSubject w nieobserwowalnym kodzie (bez subskrypcji), możesz użyć tej getValue()metody.

Przykład:

customer.service.ts

public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);

public async getCustomers(): Promise<void> {
    let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
    if (customers) 
        this.customers$.next(customers);
}

Następnie, gdziekolwiek jest to wymagane, możemy po prostu subskrybować customers$.

public ngOnInit(): void {
    this.customerService.customers$
    .subscribe((customers: Customer[]) => this.customerList = customers);
}

A może chcesz użyć go bezpośrednio w szablonie

<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

Zatem teraz, dopóki nie wykonasz kolejnego połączenia getCustomers, dane są przechowywane w customers$BehaviorSubject.

Co jeśli chcesz odświeżyć te dane? po prostu zadzwoń dogetCustomers()

public async refresh(): Promise<void> {
    try {
      await this.customerService.getCustomers();
    } 
    catch (e) {
      // request failed, handle exception
      console.error(e);
    }
}

Korzystając z tej metody, nie musimy jawnie zatrzymywać danych między kolejnymi połączeniami sieciowymi, ponieważ są one obsługiwane przez BehaviorSubject.

PS: Zazwyczaj, gdy element ulega zniszczeniu, dobrą praktyką jest pozbycie się subskrypcji, w tym celu możesz użyć metody sugerowanej w tej odpowiedzi.


1

Świetne odpowiedzi.

Lub możesz to zrobić:

To jest z najnowszej wersji rxjs. Korzystam z wersji RxJS 5.5.7

import {share} from "rxjs/operators";

this.http.get('/someUrl').pipe(share());

0

Po prostu wywołaj share () po mapie i przed subskrypcją .

W moim przypadku mam usługę ogólną (RestClientService.ts), która wykonuje wywołanie reszty, wyodrębnia dane, sprawdza błędy i wraca do obserwowalności do konkretnej usługi wdrożeniowej (np .: ContractClientService.ts), wreszcie ta konkretna implementacja zwraca obserwowalne do pliku ContractComponent.ts, a ten subskrybuje, aby zaktualizować widok.

RestClientService.ts:

export abstract class RestClientService<T extends BaseModel> {

      public GetAll = (path: string, property: string): Observable<T[]> => {
        let fullPath = this.actionUrl + path;
        let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
        observable = observable.share();  //allows multiple subscribers without making again the http request
        observable.subscribe(
          (res) => {},
          error => this.handleError2(error, "GetAll", fullPath),
          () => {}
        );
        return observable;
      }

  private extractData(res: Response, property: string) {
    ...
  }
  private handleError2(error: any, method: string, path: string) {
    ...
  }

}

ContractService.ts:

export class ContractService extends RestClientService<Contract> {
  private GET_ALL_ITEMS_REST_URI_PATH = "search";
  private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
  public getAllItems(): Observable<Contract[]> {
    return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
  }

}

ContractComponent.ts:

export class ContractComponent implements OnInit {

  getAllItems() {
    this.rcService.getAllItems().subscribe((data) => {
      this.items = data;
   });
  }

}

0

Napisałem klasę pamięci podręcznej,

/**
 * Caches results returned from given fetcher callback for given key,
 * up to maxItems results, deletes the oldest results when full (FIFO).
 */
export class StaticCache
{
    static cachedData: Map<string, any> = new Map<string, any>();
    static maxItems: number = 400;

    static get(key: string){
        return this.cachedData.get(key);
    }

    static getOrFetch(key: string, fetcher: (string) => any): any {
        let value = this.cachedData.get(key);

        if (value != null){
            console.log("Cache HIT! (fetcher)");
            return value;
        }

        console.log("Cache MISS... (fetcher)");
        value = fetcher(key);
        this.add(key, value);
        return value;
    }

    static add(key, value){
        this.cachedData.set(key, value);
        this.deleteOverflowing();
    }

    static deleteOverflowing(): void {
        if (this.cachedData.size > this.maxItems) {
            this.deleteOldest(this.cachedData.size - this.maxItems);
        }
    }

    /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
    /// However that seems not to work. Trying with forEach.
    static deleteOldest(howMany: number): void {
        //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
        let iterKeys = this.cachedData.keys();
        let item: IteratorResult<string>;
        while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
            //console.debug("    Deleting: " + item.value);
            this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
        }
    }

    static clear(): void {
        this.cachedData = new Map<string, any>();
    }

}

Wszystko jest statyczne ze względu na to, jak go używamy, ale możesz uczynić go normalną klasą i usługą. Nie jestem jednak pewien, czy angular utrzymuje cały czas jedną instancję (nowość w Angular2).

I tak to wykorzystuję:

            let httpService: Http = this.http;
            function fetcher(url: string): Observable<any> {
                console.log("    Fetching URL: " + url);
                return httpService.get(url).map((response: Response) => {
                    if (!response) return null;
                    if (typeof response.json() !== "array")
                        throw new Error("Graph REST should return an array of vertices.");
                    let items: any[] = graphService.fromJSONarray(response.json(), httpService);
                    return array ? items : items[0];
                });
            }

            // If data is a link, return a result of a service call.
            if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
            {
                // Make an HTTP call.
                let url = this.data[verticesLabel][name]["link"];
                let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
                if (!cachedObservable)
                    throw new Error("Failed loading link: " + url);
                return cachedObservable;
            }

Zakładam, że może istnieć bardziej sprytny sposób, który wymagałby pewnych Observablesztuczek, ale było to w porządku dla moich celów.


0

Wystarczy użyć tej warstwy pamięci podręcznej, robi wszystko, czego potrzebujesz, a nawet zarządzać pamięcią podręczną dla żądań ajax.

http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

Jest to bardzo łatwe w użyciu

@Component({
    selector: 'home',
    templateUrl: './html/home.component.html',
    styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
    constructor(AjaxService:AjaxService){
        AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
    }

    articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}

Warstwa (jako iniekcyjna usługa kątowa) jest

import { Injectable }     from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable }     from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
    public data:Object={};
    /*
    private dataObservable:Observable<boolean>;
     */
    private dataObserver:Array<any>=[];
    private loading:Object={};
    private links:Object={};
    counter:number=-1;
    constructor (private http: Http) {
    }
    private loadPostCache(link:string){
     if(!this.loading[link]){
               this.loading[link]=true;
               this.links[link].forEach(a=>this.dataObserver[a].next(false));
               this.http.get(link)
                   .map(this.setValue)
                   .catch(this.handleError).subscribe(
                   values => {
                       this.data[link] = values;
                       delete this.loading[link];
                       this.links[link].forEach(a=>this.dataObserver[a].next(false));
                   },
                   error => {
                       delete this.loading[link];
                   }
               );
           }
    }

    private setValue(res: Response) {
        return res.json() || { };
    }

    private handleError (error: Response | any) {
        // In a real world app, we might use a remote logging infrastructure
        let errMsg: string;
        if (error instanceof Response) {
            const body = error.json() || '';
            const err = body.error || JSON.stringify(body);
            errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
        } else {
            errMsg = error.message ? error.message : error.toString();
        }
        console.error(errMsg);
        return Observable.throw(errMsg);
    }

    postCache(link:string): Observable<Object>{

         return Observable.create(observer=> {
             if(this.data.hasOwnProperty(link)){
                 observer.next(this.data[link]);
             }
             else{
                 let _observable=Observable.create(_observer=>{
                     this.counter=this.counter+1;
                     this.dataObserver[this.counter]=_observer;
                     this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
                     _observer.next(false);
                 });
                 this.loadPostCache(link);
                 _observable.subscribe(status=>{
                     if(status){
                         observer.next(this.data[link]);
                     }
                     }
                 );
             }
            });
        }
}

0

To jest .publishReplay(1).refCount();lub.publishLast().refCount(); odkąd obserwacje Angular Http są kompletne na żądanie.

Ta prosta klasa buforuje wynik, dzięki czemu możesz subskrybować .value wiele razy i wysyła tylko 1 żądanie. Możesz także użyć funkcji .reload (), aby złożyć nowe żądanie i opublikować dane.

Możesz go używać w następujący sposób:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

i źródło:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}

0

Możesz zbudować prostą klasę Cacheable <>, która pomaga zarządzać danymi pobranymi z serwera http przez wielu subskrybentów:

declare type GetDataHandler<T> = () => Observable<T>;

export class Cacheable<T> {

    protected data: T;
    protected subjectData: Subject<T>;
    protected observableData: Observable<T>;
    public getHandler: GetDataHandler<T>;

    constructor() {
      this.subjectData = new ReplaySubject(1);
      this.observableData = this.subjectData.asObservable();
    }

    public getData(): Observable<T> {
      if (!this.getHandler) {
        throw new Error("getHandler is not defined");
      }
      if (!this.data) {
        this.getHandler().map((r: T) => {
          this.data = r;
          return r;
        }).subscribe(
          result => this.subjectData.next(result),
          err => this.subjectData.error(err)
        );
      }
      return this.observableData;
    }

    public resetCache(): void {
      this.data = null;
    }

    public refresh(): void {
      this.resetCache();
      this.getData();
    }

}

Stosowanie

Zadeklaruj obiekt buforowalny <> (prawdopodobnie jako część usługi):

list: Cacheable<string> = new Cacheable<string>();

i przewodnik:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

Zadzwoń z komponentu:

//gets data from server
List.getData().subscribe(…)

Możesz subskrybować kilka komponentów.

Więcej szczegółów i przykład kodu można znaleźć tutaj: http://devinstance.net/articles/20171021/rxjs-cacheable


0

Możesz po prostu użyć pamięci podręcznej ngx ! To lepiej pasuje do twojego scenariusza.

Korzyści z korzystania z tego

  • Wywołuje reszta API tylko raz, buforuje odpowiedź i zwraca to samo dla kolejnych żądań.
  • Może wywoływać API zgodnie z wymaganiami po operacji tworzenia / aktualizacji / usuwania.

Twoja klasa usług byłaby mniej więcej taka -

import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';

const customerNotifier = new Subject();

@Injectable()
export class customersService {

    // relieves all its caches when any new value is emitted in the stream using notifier
    @Cacheable({
        cacheBusterObserver: customerNotifier,
        async: true
    })
    getCustomer() {
        return this.http.get('/someUrl').map(res => res.json());
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    addCustomer() {
        // some code
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    updateCustomer() {
        // some code
    }
}

Oto link, aby uzyskać więcej informacji.


-4

Czy próbowałeś uruchomić kod, który już masz?

Ponieważ konstruujesz Obserowalny z obietnicy wynikającej z tego getJSON(), żądanie sieciowe jest wysyłane zanim ktokolwiek zasubskrybuje. Otrzymana obietnica jest podzielana przez wszystkich subskrybentów.

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...

Zmieniłem pytanie, aby było specyficzne dla Angulara 2
Angular University
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.