0

我有两段代码 st
- 一段在系统中生成一组活动警报流。
- 第二个消耗警报的引发/下降事件。

假设第一部分产生以下流
["a", "b"],
["c"],
["e", "f", "g"],
我想将它们推送为
("a", True),
("b", True),
("c", True),
("a", False),
("b", False),
("e", True),
("f", True),
("g", True),
("c", False).
到系统的第二部分。

我可以做到以下几点

events=[["a", "b"], ["c"], ["e", "f", "g"]]
alerts = Observable\
            .from_(events)\
            .map(lambda x : set(x))\
            .scan(lambda (prev, events), curr : (curr, {(i, True)  for i in curr - prev}.union(\
                                                       {(i, False) for i in prev - curr})),\
                  (set(), set()))\
            .map(lambda (prev, events) : events)

subject = rx.subjects.Subject()

def my_flatten(set):
    for x in set:
        subject.on_next(x)

subject.subscribe(lambda x : print(x))

alerts.subscribe(my_flatten)

产生以下结果,没关系

('a', True)
('b', True)
('b', False)
('a', False)
('c', True)
('c', False)
('g', True)
('e', True)
('f', True)

但我希望有一个没有主题的解决方案,如下所示

events=[["a", "b"], ["c"], ["e", "f", "g"]]

alerts = Observable\
            .from_(events)\
            .map(lambda x : set(x))\
            .scan(lambda (prev, events), curr : (curr, {(i, True)  for i in curr - prev}.union(\
                                                       {(i, False) for i in prev - curr})),\
                  (set(), set()))\
            .flat_map(lambda (prev, events) : events)
alerts.subscribe(lambda x : print(x))

alerts = Observable\ .from_(events)\ .map(lambda x : set(x))\ .scan(lambda (prev, events), curr : (curr, {(i, True) for i in curr - prev} .union({(i, False) for i in prev - curr})), (set(), set()))\ .map(lambda (prev, events) : events)

但它会产生以下内容,这是不正确的,因为您无法从中重建活动事件,最终c变为活动状态。

('a', True)
('b', True)
('b', False)
('a', False)
('c', False)
('c', True)
('g', True)
('e', True)
('f', True)

flat_map不保留顺序,您认为还有其他解决方案吗?

谢谢你,
迈克尔

4

0 回答 0