2

combineLatest用来将数据从一个对象websocket合并database到一个对象中。每次websocket发送数据时,我的数据都会刷新。尽管我也想通过按下按钮手动刷新页面上的数据。不幸combineLatest的是,没有检测到我HttpGet请求获取最新数据。

export class ShowTestDataComponent implements OnInit {
  stockData: RootObject;

  private stocksData$: WebSocketSubject<MessageEvent>;
  private stocksResources$: Observable<DbObject[]>;

  constructor(private stockService: StockService, private webSocketService: WebSocketServiceService) {
    this.stocksData$ = this.webSocketService.connect();
    this.stocksResources$ = this.stockService.getStocksResources();
  }

  buyStock(code, quantityToBuy): void {
    const dto: DbObject = {
      code: code,
      quantity: quantityToBuy
    };

    this.stockService.buyStock(dto).pipe(
      mergeMap(() => this.stockService.getStocksResources())
    ).subscribe((res) => this.stocksResources$ = of(res));    
  }

  prepareStockData(): void {
    combineLatest(
      this.stocksData$,
      this.stocksResources$,
      (stockData: RootObject, stockResources: DbObject[]) => ({
        stockData,
        stockResources
      })
    ).subscribe((pair) => {
      this.stockData = JSON.parse(pair.stockData.data);

      this.stockData.Items.forEach(item => {
        item.Quantity = pair.stockResources.find(d => d.code === item.Code).quantity;
      });
    });
  }

  ngOnInit() {
    this.prepareStockData();
  }
}

如您所见,如果我按下BUYUI,则将buyStock执行方法并执行this.stockService.buyStock(dto)更改 DB 中的数据,然后this.stockService.getStocksResources()刷新 DB 中的数据。一切正常,数据已更改但数据未刷新(comibneLatest 未检测到来自stocksResources$

这是我的服务,但我认为问题出在组件中。

export class StockService {
  apiUrl = "http://localhost:5001/api/values";
  constructor(private http: HttpClient) { }

  getStocksResources(): Observable<DbObject[]> {
    return this.http.get<DbObject[]>(this.apiUrl + "/stocks");
  }

  buyStock(data: DbObject){
    return this.http.post<DbObject>(this.apiUrl + "/quantity", data);
  }
}
4

1 回答 1

3

您必须创建新主题。所以声明:

private stocksResources$: Subject<DbObject[]> = new Subject();

在构造函数中,您必须stockResources使用您的发送接收到的数据Subject

constructor(private stockService: StockService) {
  this.stockService.getStocksResources().subscribe(res => {
    this.stocksResources$.next(res);
  });
}

每次您购买新股票时,您都必须再次发布您的数据。

this.stockService.buyStock(dto).pipe(
  mergeMap(() => this.stockService.getStocksResources())
).subscribe((res) => {
  this.stocksResources$.next(res);
});

就是这样:)

整个代码:

export class ShowTestDataComponent implements OnInit {
  stockData: RootObject;

  private stocksData$: WebSocketSubject<MessageEvent>;
  private stocksResources$: Subject<DbObject[]> = new Subject();

  constructor(private stockService: StockService, private webSocketService: WebSocketServiceService) {
    this.stocksData$ = this.webSocketService.connect();
    this.stockService.getStocksResources().subscribe(res => {
      this.stocksResources$.next(res);
    });
  }

  buyStock(code, quantityToBuy): void {
    const dto: DbObject = {
      code: code,
      quantity: quantityToBuy
    };

    this.stockService.buyStock(dto).pipe(
      mergeMap(() => this.stockService.getStocksResources())
    ).subscribe((res) => {
      this.stocksResources$.next(res);
    });
  }

  prepareStockData(): void {
    combineLatest(
      this.stocksData$,
      this.stocksResources$,
      (stockData: RootObject, stockResources: DbObject[]) => ({
        stockData,
        stockResources
      })
    ).subscribe((pair) => {
      this.stockData = JSON.parse(pair.stockData.data);

      this.stockData.Items.forEach(item => {
        item.Quantity = pair.stockResources.find(d => d.code === item.Code).quantity;
      });
    });
  }

  ngOnInit() {
    this.prepareStockData();
  }
}
于 2018-06-17T15:56:27.390 回答