0

尝试使用 RxJS v5 构建计划,其中某些事件可以触发计划重新加载。当前使用 3 个源 - schedule$、event$ 和 userNotification$(示例如下)。

我已经尝试了许多不同的策略,并且当 reloadSchedule 事件时间命中时,我一直很奇怪,比如递归重新加载。有没有办法让下游数据 (event$) 干净地触发上游 (schedule$) 重新加载,而不会有任何动作/通知从以前的计划项目中挥之不去?

schedule$ = new Rx.BehaviorSubject(
  {schedule:[
    {start:'1pm', end:'2pm', action:'sayhi'},
    {start:'2pm', end:'3pm', action:'sayhi'},
    {start:'3pm', end:'3pm', action:'reloadSchedule'},
    {start:'3:01pm', end:'4pm', action:'sayhi'},
  ]}
);

function loadSchedule(){
  somethingAsync.then((moreData)=>schedule$.next(moreData));
}

event$ = schedule$.flatMap((data)=>{
  return Rx.Observable
    .from(data.schedule)
    .flatMap((event)=>{
      return Rx.Observable.timer(event.start)
      .flatMap(()=>{
        // do actions here once previous actions/notifications finish
        if(event.action === 'reloadSchedule'){
          loadSchedule()
        }
        return Rx.Observable.of(someUserMessage);
      })
    })
})

userNotification$ = Rx.Observable.timer(1000).withLatestFrom(event$)
.flatMap((someUserMessage)={
  // fade message after 5 seconds
});

userNotification.subscribe(()=>{});
4

1 回答 1

0

最终想出了一个解决方案。可能有更清洁的方法可以做到这一点,但它奏效了。

基本思想是有一个计时器来控制动作。将事件时间与该计时器进行比较以获得正确的当前事件。在需要重新加载时取消订阅。

粗略的例子。

// start and end are ISO strings - showing 1pm etc.
let schedule$ = new Rx.BehaviorSubject([
  {start:'1pm', end:'2pm', action:'sayhi'},
  {start:'2pm', end:'3pm', action:'sayhi'},
  {start:'3pm', end:'3pm', action:'reloadSchedule'},
  {start:'3:01pm', end:'4pm', action:'sayhi'},
]);

schedule$.subscribe((sched)=>{
  new Scheduler(sched)
});

function loadSchedule(){
  somethingAsync.then((moreData)=>schedule$.next(moreData));
}

class Scheduler{
  constructor(schedule){
    let notificationsCleared = true;
    let sliced;
    let event$ = Rx.Observable
      .timer(1000)
      .filter(()=>notificationsCleared)
      .map(()=>{
        let now = (new Date()).toISOString();
        sliced || (sliced = schedule.slice(0));
        while (now > sliced[0].end){
          sliced.shift();
        }
        return sliced[0];
      }).share();

    let cleanup$ = event$.filter((evt)=>evt.action === 'reloadSchedule')

    let userNotification$ = event$.map(()=>{
      notificationsCleared = false;
      someAsyncNotification()
      .then(()=>notificationsCleared = true)
    });

    let userSub = userNotification.subscribe(()=>{});
    let cleanupSub = cleanup$.subscribe(()=>{
      loadSchedule();
      userSub.unsubscribe();
      cleanupSub.unsubscribe();
    });
  }
};
于 2016-03-26T16:41:25.623 回答