0

我正在尝试制作开始和停止事件的主题,其中迟到的订阅者只会收到未完成的开始事件。IE。那些没有相应停止事件的人。

这是一些 RxPY 代码:

from rx.subjects import ReplaySubject

start = ReplaySubject()

start.subscribe(lambda x: print("subscriber1: " + str(x)))

start.on_next(("a", "start"))
start.on_next(("b", "start"))
start.on_next(("b", "stop"))

start.subscribe(lambda x: print("subscriber2: " + str(x)))

start.on_next(("c", "start"))

这给出了输出:

subscriber1: ('a', 'start')
subscriber1: ('b', 'start')
subscriber1: ('b', 'stop')
subscriber2: ('a', 'start')
subscriber2: ('b', 'start')
subscriber2: ('b', 'stop')
subscriber1: ('c', 'start')
subscriber2: ('c', 'start')

而我想:

subscriber1: ('a', 'start')
subscriber1: ('b', 'start')
subscriber1: ('b', 'stop')
subscriber2: ('a', 'start')
subscriber1: ('c', 'start')
subscriber2: ('c', 'start')

我认为像扫描运算符这样的东西是必需的,但不能完全放在一起。感激地收到任何想法:)

4

1 回答 1

0

最干净的解决方案是使用主流的副作用来更新字典并将未完成的事件合并到新的订阅者。

class EventObserver(Observer):
  def __init__(self):
    self.cached_events = set()
    self.mirror = Subject() # re-emits all values

  on_next(self, value):
    self.mirror.next(value) # stream to late observers
    if(value[1] == 'stop'):
      try:
        self.cached_events.remove(value[0])
      except KeyError:
        pass
    else:
      self.cached_events.add(value[0])

  on_error(self, e):
    self.mirror.error(e) # + other error logic

  on_completed(self):
    self.mirror.complete() # + other completion logic

  late_subscribe(self, subscriber):
    return Observable.merge(
      Observable.from(list(self.cached_events)),
      self.mirror
    ).subscribe(subscriber)

使用如下:

event_observer = EventObserver()
events$.subscribe(event_observer)

# late subscription:
event_observer.late_subscribe(...)

答案的其余部分解释了为什么您可能更喜欢这种方法而不是被动方法。


反应式方法:

这是我能想到的最简单的解决方案,如果您不介意迟到的订阅者等到下一个活动。如您所见,它不是最漂亮的。

pub_events$ = events$.publish(); # in case your events$ aren't hot
replay_events$ = pub_events$.replay();

# late subscription:
replay_events$.window(events$.take(1))
              .scan(lambda is_first, o: 
                      o.reduce(lambda D, x: D.update({ x[0]: x[1] == 'stop' }) or D, {})
                       .flatMap(lambda D: Observable.from([ k for k, v in D.items() if v == False ]))
                      if is_first == True else o,
                    True)
              .flatMap(lambda o: o)

目标是使用从所有先前事件的缓存中构建的未完成事件的过滤列表开始延迟订阅。最大的障碍是ReplaySubject没有将这些缓存事件与新事件区分开来。解决上述问题的第一步是处理window下一个事件,期望ReplaySubject在此之前发出缓存的事件。由于您的要求听起来像是优化而不是正确性,因此这里的竞争条件可能没什么大不了的。

最多有两个窗口:一个是缓存的事件,一个是新的事件(如果有的话),所以scan稍微利用 Python 类型的弱点来检查我们在哪个窗口。如果是缓存的事件,我们构建事件键字典→该事件是否“停止”。最后一步是使用flatMap.

于 2017-11-05T22:08:05.487 回答