我有两个事件流。一个来自电感回路,另一个来自 IP 摄像机。汽车将驶过环路,然后撞上相机。如果事件彼此相隔 N 毫秒,我想将它们组合起来(汽车总是首先进入循环),但我也希望每个流中的不匹配事件(任何一个硬件都可能发生故障)都合并到一个流中。像这样的东西:
---> (only unmatched a's, None)
/ \
stream_a (loop) \
\ \
--> (a, b) ---------------------------> (Maybe a, Maybe b)
/ /
stream_b (camera) /
\ /
--> (None, only unmatched b's)
现在,我当然可以通过良好的 ole Subject 反模式来解决问题:
unmatched_a = Subject()
def noop():
pass
pending_as = [[]]
def handle_unmatched(a):
if a in pending_as[0]:
pending_as[0].remove(a)
print("unmatched a!")
unmatched_a.on_next((a, None))
def handle_a(a):
pending_as[0].append(a)
t = threading.Timer(some_timeout, handle_unmatched)
t.start()
return a
def handle_b(b):
if len(pending_as[0]):
a = pending_as[0].pop(0)
return (a, b)
else:
print("unmatched b!")
return (None, b)
stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)
这不仅相当 hacky,而且虽然我没有观察到它,但我很确定当我使用threading.Timer
. 鉴于过多的 rx 运算符,我很确定它们的某种组合可以让您在不使用的情况下做到这一点Subject
,但我无法弄清楚。如何做到这一点?
编辑
尽管出于组织和操作方面的原因,我更愿意坚持使用 Python,但我将采用 JavaScript rxjs 的答案并将其移植,甚至可能在 node.js 中重写整个脚本。