336

通过使用 Http,我们调用了一个执行网络调用并返回 http observable 的方法:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们采用这个 observable 并向它添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这可能看起来像一个不寻常的场景,但实际上很常见:例如,如果调用者订阅 observable 以显示错误消息,并使用异步管道将其传递给模板,我们已经有两个订阅者。

在 RxJs 5 中这样做的正确方法是什么?

也就是说,这似乎工作正常:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但这是在 RxJs 5 中这样做的惯用方式,还是我们应该做其他事情?

注意:根据 Angular 5 new HttpClient.map(res => res.json())所有示例中的部分现在都是无用的,因为现在默认采用 JSON 结果。

4

21 回答 21

240

编辑:截至 2021 年,正确的方法是使用shareReplayRxJs 原生提出的运算符。在下面的答案中查看更多详细信息。


缓存数据,如果缓存可用,则返回此数据,否则发出 HTTP 请求。

import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';

@Injectable()
export class DataService {
  private url: string = 'https://cors-test.appspot.com/test';
  
  private data: Data;
  private observable: Observable<any>;

  constructor(private http: Http) {}

  getData() {
    if(this.data) {
      // if `data` is available just return it as `Observable`
      return Observable.of(this.data); 
    } else if(this.observable) {
      // if `this.observable` is set then the request is in progress
      // return the `Observable` for the ongoing request
      return this.observable;
    } else {
      // example header (not necessary)
      let headers = new Headers();
      headers.append('Content-Type', 'application/json');
      // create the request, store the `Observable` for subsequent subscribers
      this.observable = this.http.get(this.url, {
        headers: headers
      })
      .map(response =>  {
        // when the cached data is available we don't need the `Observable` reference anymore
        this.observable = null;

        if(response.status == 400) {
          return "FAILURE";
        } else if(response.status == 200) {
          this.data = new Data(response.json());
          return this.data;
        }
        // make it shared so more than one subscriber can get the result
      })
      .share();
      return this.observable;
    }
  }
}

Plunker 示例

这篇文章https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html很好地解释了如何使用shareReplay.

于 2016-03-29T17:56:32.160 回答
49

根据@Cristian 的建议,这是一种适用于 HTTP 可观察对象的方法,它只发出一次然后完成:

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}
于 2016-03-29T22:09:46.997 回答
37

UPDATE: Ben Lesh says the next minor release after 5.2.0, you'll be able to just call shareReplay() to truly cache.

PREVIOUSLY.....

Firstly, don't use share() or publishReplay(1).refCount(), they are the same and the problem with it, is that it only shares if connections are made while the observable is active, if you connect after it completes, it creates a new observable again, translation, not really caching.

Birowski gave the right solution above, which is to use ReplaySubject. ReplaySubject will caches the values you give it (bufferSize) in our case 1. It will not create a new observable like share() once refCount reaches zero and you make a new connection, which is the right behavior for caching.

Here's a reusable function

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}

Here's how to use it

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}

Below is a more advance version of the cacheable function This one allows has its own lookup table + the ability to provide a custom lookup table. This way, you don't have to check this._cache like in the above example. Also notice that instead of passing the observable as the first argument, you pass a function which returns the observables, this is because Angular's Http executes right away, so by returning a lazy executed function, we can decide not to call it if it's already in our cache.

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}

Usage:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")
于 2017-03-23T01:28:40.617 回答
35

rxjs 5.4.0有一个新的shareReplay方法。

作者明确表示“非常适合处理缓存 AJAX 结果之类的事情”

rxjs PR #2443 feat(shareReplay): 添加shareReplay变体publishReplay

shareReplay 返回一个 observable,它是通过 ReplaySubject 多播的源。该重播主题会在源错误时回收,但不会在源完成时回收。这使得 shareReplay 非常适合处理诸如缓存 AJAX 结果之类的事情,因为它是可重试的。然而,它的重复行为与 share 不同,它不会重复源 observable,而是重复源 observable 的值。

于 2017-05-12T17:15:22.200 回答
30

according to this article

It turns out we can easily add caching to the observable by adding publishReplay(1) and refCount.

so inside if statements just append

.publishReplay(1)
.refCount();

to .map(...)

于 2016-06-10T20:17:12.287 回答
25

rxjs 版本 5.4.0 (2017-05-09)添加了对shareReplay的支持。

为什么要使用 shareReplay?

当您不希望在多个订阅者之间执行的副作用或繁重的计算时,您通常希望使用 shareReplay。在您知道您将有迟到的订阅者需要访问先前发出的值的流的情况下,它也可能很有价值。这种在订阅时重播价值的能力是 share 和 shareReplay 的区别。

您可以轻松地修改一个角度服务来使用它并返回一个带有缓存结果的可观察对象,该结果只会进行一次 http 调用(假设第一次调用成功)。

示例 Angular 服务

这是一个非常简单的客户服务,它使用shareReplay.

客户服务.ts

import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Injectable({providedIn: 'root'})
export class CustomerService {

    private readonly _getCustomers: Observable<ICustomer[]>;

    constructor(private readonly http: HttpClient) {
        this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
    }
    
    getCustomers() : Observable<ICustomer[]> {
        return this._getCustomers;
    }
}

export interface ICustomer {
  /* ICustomer interface fields defined here */
}

请注意,构造函数中的赋值可以移动到方法中getCustomers,但是由于从返回的 observablesHttpClient是“冷的”,因此在构造函数中这样做是可以接受的,因为 http 调用只会在第一次调用subscribe.

这里还假设初始返回的数据在应用程序实例的生命周期内不会过时。

于 2019-02-28T20:53:22.800 回答
10

我为这个问题加了星标,但我会尝试解决这个问题。

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

这是证据:)

只有一个要点:getCustomer().subscribe(customer$)

我们没有订阅 的 api 响应getCustomer(),我们订阅的是可观察的 ReplaySubject,它也能够订阅不同的 Observable 并且(这很重要)保存它的最后发出的值并将其重新发布到它的任何一个(ReplaySubject 的) 订阅者。

于 2016-03-29T21:59:49.037 回答
8

我找到了一种将 http get 结果存储到 sessionStorage 并将其用于会话的方法,这样它就不会再调用服务器了。

我用它来调用 github API 以避免使用限制。

@Injectable()
export class HttpCache {
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    let cached: any;
    if (cached === sessionStorage.getItem(url)) {
      return Observable.of(JSON.parse(cached));
    } else {
      return this.http.get(url)
        .map(resp => {
          sessionStorage.setItem(url, resp.text());
          return resp.json();
        });
    }
  }
}

仅供参考,sessionStorage 限制为 5M(或 4.75M)。因此,它不应该像这样用于大量数据。

------ 编辑 -------------
如果你想用 F5 刷新数据,它使用内存数据而不是 sessionStorage;

@Injectable()
export class HttpCache {
  cached: any = {};  // this will store data
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    if (this.cached[url]) {
      return Observable.of(this.cached[url]));
    } else {
      return this.http.get(url)
        .map(resp => {
          this.cached[url] = resp.text();
          return resp.json();
        });
    }
  }
}
于 2016-07-26T22:25:37.810 回答
6

您选择的实现将取决于您是否希望 unsubscribe() 取消您的 HTTP 请求。

在任何情况下,TypeScript 装饰器都是标准化行为的好方法。这是我写的:

  @CacheObservableArgsKey
  getMyThing(id: string): Observable<any> {
    return this.http.get('things/'+id);
  }

装饰器定义:

/**
 * Decorator that replays and connects to the Observable returned from the function.
 * Caches the result using all arguments to form a key.
 * @param target
 * @param name
 * @param descriptor
 * @returns {PropertyDescriptor}
 */
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
  const originalFunc = descriptor.value;
  const cacheMap = new Map<string, any>();
  descriptor.value = function(this: any, ...args: any[]): any {
    const key = args.join('::');

    let returnValue = cacheMap.get(key);
    if (returnValue !== undefined) {
      console.log(`${name} cache-hit ${key}`, returnValue);
      return returnValue;
    }

    returnValue = originalFunc.apply(this, args);
    console.log(`${name} cache-miss ${key} new`, returnValue);
    if (returnValue instanceof Observable) {
      returnValue = returnValue.publishReplay(1);
      returnValue.connect();
    }
    else {
      console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
    }
    cacheMap.set(key, returnValue);
    return returnValue;
  };

  return descriptor;
}
于 2017-05-09T20:38:18.223 回答
5

使用 Rxjs Observer/Observable + 缓存 + 订阅的可缓存 HTTP 响应数据

请参阅下面的代码

*免责声明:我是 rxjs 的新手,所以请记住,我可能会滥用可观察/观察者方法。我的解决方案纯粹是我找到的其他解决方案的集合,并且是未能找到一个简单的、有据可查的解决方案的结果。因此,我提供了完整的代码解决方案(正如我希望找到的那样),希望它可以帮助其他人。

*注意,这种方法松散地基于 GoogleFirebaseObservables。不幸的是,我缺乏适当的经验/时间来复制他们在幕后所做的事情。但以下是提供对某些可缓存数据的异步访问的简单方法。

情况:“产品列表”组件的任务是显示产品列表。该网站是一个单页网络应用程序,带有一些菜单按钮,可以“过滤”页面上显示的产品。

解决方案:组件“订阅”服务方法。service 方法返回一个产品对象数组,组件通过订阅回调访问这些对象。服务方法将其活动包装在一个新创建的观察者中并返回观察者。在这个观察者内部,它搜索缓存的数据并将其传递回订阅者(组件)并返回。否则,它会发出一个 http 调用来检索数据,订阅响应,您可以在其中处理该数据(例如将数据映射到您自己的模型),然后将数据传回给订阅者。

编码

产品列表.component.ts

import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';

@Component({
  selector: 'app-product-list',
  templateUrl: './product-list.component.html',
  styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
  products: Product[];

  constructor(
    private productService: ProductService
  ) { }

  ngOnInit() {
    console.log('product-list init...');
    this.productService.getProducts().subscribe(products => {
      console.log('product-list received updated products');
      this.products = products;
    });
  }
}

产品.service.ts

import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';

@Injectable()
export class ProductService {
  products: Product[];

  constructor(
    private http:Http
  ) {
    console.log('product service init.  calling http to get products...');

  }

  getProducts():Observable<Product[]>{
    //wrap getProducts around an Observable to make it async.
    let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
      //return products if it was previously fetched
      if(this.products){
        console.log('## returning existing products');
        observer.next(this.products);
        return observer.complete();

      }
      //Fetch products from REST API
      console.log('** products do not yet exist; fetching from rest api...');
      let headers = new Headers();
      this.http.get('http://localhost:3000/products/',  {headers: headers})
      .map(res => res.json()).subscribe((response:ProductResponse) => {
        console.log('productResponse: ', response);
        let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
        this.products = productlist;
        observer.next(productlist);
      });
    }); 
    return productsObservable$;
  }
}

product.ts(模型)

export interface ProductResponse {
  success: boolean;
  msg: string;
  products: Product[];
}

export class Product {
  product_id: number;
  sku: string;
  product_title: string;
  ..etc...

  constructor(product_id: number,
    sku: string,
    product_title: string,
    ...etc...
  ){
    //typescript will not autoassign the formal parameters to related properties for exported classes.
    this.product_id = product_id;
    this.sku = sku;
    this.product_title = product_title;
    ...etc...
  }



  //Class method to convert products within http response to pure array of Product objects.
  //Caller: product.service:getProducts()
  static fromJsonList(products:any): Product[] {
    let mappedArray = products.map(Product.fromJson);
    return mappedArray;
  }

  //add more parameters depending on your database entries and constructor
  static fromJson({ 
      product_id,
      sku,
      product_title,
      ...etc...
  }): Product {
    return new Product(
      product_id,
      sku,
      product_title,
      ...etc...
    );
  }
}

这是我在 Chrome 中加载页面时看到的输出示例。请注意,在初始加载时,产品是从 http 获取的(调用我的节点休息服务,该服务在端口 3000 上本地运行)。然后,当我单击导航到产品的“过滤”视图时,会在缓存中找到产品。

我的 Chrome 日志(控制台):

core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init.  calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse:  {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products

...[单击菜单按钮过滤产品]...

app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products

结论:这是我(到目前为止)发现的实现可缓存 http 响应数据的最简单方法。在我的 Angular 应用程序中,每次我导航到产品的不同视图时,产品列表组件都会重新加载。ProductService 似乎是一个共享实例,所以导航时会保留 ProductService 中 'products: Product[]' 的本地缓存,后续调用“GetProducts()”会返回缓存的值。最后一点,我已经阅读了有关在完成后如何关闭可观察对象/订阅以防止“内存泄漏”的评论。我没有在此处包含此内容,但请记住这一点。

于 2017-09-02T07:46:27.260 回答
3

我们要做的是确保这不会导致多个网络请求。

我个人最喜欢的是使用async方法来发出网络请求。方法本身不返回值,而是BehaviorSubject在同一服务中更新 a,组件将订阅这些服务。

现在为什么使用 aBehaviorSubject而不是 a Observable?因为,

  • 订阅后 BehaviorSubject 返回最后一个值,而常规 observable 仅在收到onnext.
  • 如果要在不可观察的代码中(没有订阅)检索 BehaviorSubject 的最后一个值,可以使用该getValue()方法。

例子:

客户服务.ts

public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);

public async getCustomers(): Promise<void> {
    let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
    if (customers) 
        this.customers$.next(customers);
}

然后,只要需要,我们就可以订阅customers$.

public ngOnInit(): void {
    this.customerService.customers$
    .subscribe((customers: Customer[]) => this.customerList = customers);
}

或者,也许您想直接在模板中使用它

<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

所以现在,在您再次调用 之前getCustomers,数据将保留在customers$BehaviorSubject 中。

那么如果你想刷新这些数据呢?打电话给getCustomers()

public async refresh(): Promise<void> {
    try {
      await this.customerService.getCustomers();
    } 
    catch (e) {
      // request failed, handle exception
      console.error(e);
    }
}

使用这种方法,我们不必在后续网络调用之间显式保留数据,因为它是由BehaviorSubject.

PS:通常当组件被销毁时,摆脱订阅是一个好习惯,因为您可以使用答案中建议的方法。

于 2018-03-24T01:36:31.437 回答
3

我认为@ngx-cache/core对于维护 http 调用的缓存功能可能很有用,尤其是在浏览器服务器平台上都进行 HTTP 调用的情况下。

假设我们有以下方法:

getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

您可以使用@ngx-cache/coreCached的装饰器来存储在一次执行时进行 HTTP 调用的方法返回的值(可以配置,请检查ng-seed/universal的实现)。下次调用该方法时(无论是在浏览器还是服务器平台上),都会从.cache storagestoragecache storage

import { Cached } from '@ngx-cache/core';

...

@Cached('get-customer') // the cache key/identifier
getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

还可以使用缓存 API来使用缓存方法 ( has, get, set) 。

任何类.ts

...
import { CacheService } from '@ngx-cache/core';

@Injectable()
export class AnyClass {
  constructor(private readonly cache: CacheService) {
    // note that CacheService is injected into a private property of AnyClass
  }

  // will retrieve 'some string value'
  getSomeStringValue(): string {
    if (this.cache.has('some-string'))
      return this.cache.get('some-string');

    this.cache.set('some-string', 'some string value');
    return 'some string value';
  }
}

以下是客户端和服务器端缓存的包列表:

于 2017-05-03T07:03:12.217 回答
2

您可以构建简单的类 Cacheable<> 来帮助管理从具有多个订阅者的 http 服务器检索的数据:

declare type GetDataHandler<T> = () => Observable<T>;

export class Cacheable<T> {

    protected data: T;
    protected subjectData: Subject<T>;
    protected observableData: Observable<T>;
    public getHandler: GetDataHandler<T>;

    constructor() {
      this.subjectData = new ReplaySubject(1);
      this.observableData = this.subjectData.asObservable();
    }

    public getData(): Observable<T> {
      if (!this.getHandler) {
        throw new Error("getHandler is not defined");
      }
      if (!this.data) {
        this.getHandler().map((r: T) => {
          this.data = r;
          return r;
        }).subscribe(
          result => this.subjectData.next(result),
          err => this.subjectData.error(err)
        );
      }
      return this.observableData;
    }

    public resetCache(): void {
      this.data = null;
    }

    public refresh(): void {
      this.resetCache();
      this.getData();
    }

}

用法

声明 Cacheable<> 对象(大概作为服务的一部分):

list: Cacheable<string> = new Cacheable<string>();

和处理程序:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

从组件调用:

//gets data from server
List.getData().subscribe(…)

您可以订阅多个组件。

更多细节和代码示例在这里:http ://devinstance.net/articles/20171021/rxjs-cacheable

于 2017-11-27T04:26:54.743 回答
2

很好的答案。

或者你可以这样做:

这是来自最新版本的 rxjs。我正在使用5.5.7版本的RxJS

import {share} from "rxjs/operators";

this.http.get('/someUrl').pipe(share());
于 2018-03-29T15:54:48.140 回答
1

它是.publishReplay(1).refCount();.publishLast().refCount();因为 Angular Http 可观察对象在请求后完成。

这个简单的类缓存了结果,因此您可以多次订阅 .value 并且只发出 1 个请求。您还可以使用 .reload() 发出新请求并发布数据。

你可以像这样使用它:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

和来源:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}
于 2017-11-14T22:40:24.307 回答
1

rxjs 5.3.0

我一直不开心.map(myFunction).publishReplay(1).refCount()

对于多个订阅者,在某些情况下会.map()执行myFunction两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)

您可以做的另一件事就是不要refCount()立即使用 Observable 并使其变热:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

无论订阅者如何,这都会启动 HTTP 请求。我不确定在 HTTP GET 完成之前取消订阅是否会取消它。

于 2017-04-22T00:26:49.593 回答
0

你可以简单地使用ngx-cacheable!它更适合您的场景。

使用这个的好处

  • 它只调用一次rest API,缓存响应并为后续请求返回相同的响应。
  • 创建/更新/删除操作后可以根据需要调用API。

因此,您的服务类将是这样的 -

import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';

const customerNotifier = new Subject();

@Injectable()
export class customersService {

    // relieves all its caches when any new value is emitted in the stream using notifier
    @Cacheable({
        cacheBusterObserver: customerNotifier,
        async: true
    })
    getCustomer() {
        return this.http.get('/someUrl').map(res => res.json());
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    addCustomer() {
        // some code
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    updateCustomer() {
        // some code
    }
}

是更多参考的链接。

于 2018-11-19T14:26:14.107 回答
0

只需使用这个缓存层,它就可以满足您的所有需求,甚至可以管理 ajax 请求的缓存。

http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

这很容易使用

@Component({
    selector: 'home',
    templateUrl: './html/home.component.html',
    styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
    constructor(AjaxService:AjaxService){
        AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
    }

    articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}

该层(作为可注入的角度服务)是

import { Injectable }     from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable }     from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
    public data:Object={};
    /*
    private dataObservable:Observable<boolean>;
     */
    private dataObserver:Array<any>=[];
    private loading:Object={};
    private links:Object={};
    counter:number=-1;
    constructor (private http: Http) {
    }
    private loadPostCache(link:string){
     if(!this.loading[link]){
               this.loading[link]=true;
               this.links[link].forEach(a=>this.dataObserver[a].next(false));
               this.http.get(link)
                   .map(this.setValue)
                   .catch(this.handleError).subscribe(
                   values => {
                       this.data[link] = values;
                       delete this.loading[link];
                       this.links[link].forEach(a=>this.dataObserver[a].next(false));
                   },
                   error => {
                       delete this.loading[link];
                   }
               );
           }
    }

    private setValue(res: Response) {
        return res.json() || { };
    }

    private handleError (error: Response | any) {
        // In a real world app, we might use a remote logging infrastructure
        let errMsg: string;
        if (error instanceof Response) {
            const body = error.json() || '';
            const err = body.error || JSON.stringify(body);
            errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
        } else {
            errMsg = error.message ? error.message : error.toString();
        }
        console.error(errMsg);
        return Observable.throw(errMsg);
    }

    postCache(link:string): Observable<Object>{

         return Observable.create(observer=> {
             if(this.data.hasOwnProperty(link)){
                 observer.next(this.data[link]);
             }
             else{
                 let _observable=Observable.create(_observer=>{
                     this.counter=this.counter+1;
                     this.dataObserver[this.counter]=_observer;
                     this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
                     _observer.next(false);
                 });
                 this.loadPostCache(link);
                 _observable.subscribe(status=>{
                     if(status){
                         observer.next(this.data[link]);
                     }
                     }
                 );
             }
            });
        }
}
于 2017-01-23T10:52:23.517 回答
0

只需在map之后和任何subscribe之前调用share()

就我而言,我有一个通用服务 (RestClientService.ts),它正在进行其余调用、提取数据、检查错误并将 observable 返回到具体实现服务(例如:ContractClientService.ts),最后是这个具体实现返回可观察到的 de ContractComponent.ts,并且这个订阅更新视图。

RestClientService.ts:

export abstract class RestClientService<T extends BaseModel> {

      public GetAll = (path: string, property: string): Observable<T[]> => {
        let fullPath = this.actionUrl + path;
        let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
        observable = observable.share();  //allows multiple subscribers without making again the http request
        observable.subscribe(
          (res) => {},
          error => this.handleError2(error, "GetAll", fullPath),
          () => {}
        );
        return observable;
      }

  private extractData(res: Response, property: string) {
    ...
  }
  private handleError2(error: any, method: string, path: string) {
    ...
  }

}

合同服务.ts:

export class ContractService extends RestClientService<Contract> {
  private GET_ALL_ITEMS_REST_URI_PATH = "search";
  private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
  public getAllItems(): Observable<Contract[]> {
    return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
  }

}

合同组件.ts:

export class ContractComponent implements OnInit {

  getAllItems() {
    this.rcService.getAllItems().subscribe((data) => {
      this.items = data;
   });
  }

}
于 2016-05-23T09:43:45.613 回答
0

我写了一个缓存类,

/**
 * Caches results returned from given fetcher callback for given key,
 * up to maxItems results, deletes the oldest results when full (FIFO).
 */
export class StaticCache
{
    static cachedData: Map<string, any> = new Map<string, any>();
    static maxItems: number = 400;

    static get(key: string){
        return this.cachedData.get(key);
    }

    static getOrFetch(key: string, fetcher: (string) => any): any {
        let value = this.cachedData.get(key);

        if (value != null){
            console.log("Cache HIT! (fetcher)");
            return value;
        }

        console.log("Cache MISS... (fetcher)");
        value = fetcher(key);
        this.add(key, value);
        return value;
    }

    static add(key, value){
        this.cachedData.set(key, value);
        this.deleteOverflowing();
    }

    static deleteOverflowing(): void {
        if (this.cachedData.size > this.maxItems) {
            this.deleteOldest(this.cachedData.size - this.maxItems);
        }
    }

    /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
    /// However that seems not to work. Trying with forEach.
    static deleteOldest(howMany: number): void {
        //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
        let iterKeys = this.cachedData.keys();
        let item: IteratorResult<string>;
        while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
            //console.debug("    Deleting: " + item.value);
            this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
        }
    }

    static clear(): void {
        this.cachedData = new Map<string, any>();
    }

}

由于我们使用它的方式,它都是静态的,但请随意将其设为普通类和服务。我不确定 Angular 是否一直保留一个实例(Angular2 的新手)。

这就是我使用它的方式:

            let httpService: Http = this.http;
            function fetcher(url: string): Observable<any> {
                console.log("    Fetching URL: " + url);
                return httpService.get(url).map((response: Response) => {
                    if (!response) return null;
                    if (typeof response.json() !== "array")
                        throw new Error("Graph REST should return an array of vertices.");
                    let items: any[] = graphService.fromJSONarray(response.json(), httpService);
                    return array ? items : items[0];
                });
            }

            // If data is a link, return a result of a service call.
            if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
            {
                // Make an HTTP call.
                let url = this.data[verticesLabel][name]["link"];
                let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
                if (!cachedObservable)
                    throw new Error("Failed loading link: " + url);
                return cachedObservable;
            }

我认为可能有一种更聪明的方法,它会使用一些Observable技巧,但这对我的目的来说很好。

于 2016-12-16T14:40:34.427 回答
-5

您是否尝试过运行已有的代码?

因为您是从 产生的承诺构造 Observable getJSON(),所以网络请求是在任何人订阅之前发出的。由此产生的承诺由所有订阅者共享。

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...
于 2016-03-29T13:48:17.340 回答