4

我想知道在 Rx 中解决以下问题的规范方法是什么:假设我有两个 observablesmouse_downmouse_up,其元素代表鼠标按钮按下。在一个非常简单的场景中,如果我想检测长按,我可以通过以下方式进行(在这种情况下使用 RxPy,但在任何 Rx 实现中概念上都是相同的):

mouse_long_press = mouse_down.delay(1000).take_until(mouse_up).repeat()

但是,当我们需要将一些信息从mouse_down可观察对象提升到可观察对象时,就会出现问题mouse_up。例如,考虑是否可观察的元素存储了有关按下了哪个鼠标按钮的信息。显然,我们只想mouse_downmouse_up相应的按钮配对。我想出的一个解决方案是:

mouse_long_press = mouse_down.select_many(lambda x:
    rx.Observable.just(x).delay(1000)\
        .take_until(mouse_up.where(lambda y: x.button == y.button))
)

如果有更直接的解决方案,我很想听听 - 但据我所知,这是可行的。但是,如果我们还想检测鼠标在mouse_down和之间移动了多远,事情就会变得更加复杂mouse_up。为此,我们需要引入一个新的 observable mouse_move,它携带有关鼠标位置的信息。

mouse_long_press = mouse_down.select_many(lambda x:
    mouse_move.select(lambda z: distance(x, z) > 100).delay(1000)\
        .take_until(mouse_up.where(lambda y: x.button == y.button))
)

但是,这几乎是我卡住的地方。每当一个按钮被按住超过 1 秒时,我都会得到一堆布尔值。但是,我只想在它们全部为假时检测长按,这听起来像是all 运算符的完美案例。感觉好像只少了一小步,但到目前为止我还没有弄清楚如何让它工作。也许我也在以一种倒退的方式做事。期待任何建议。

4

2 回答 2

1

好的,我想我找到了一个可能的答案。RxPy 有一个take_with_time操作符,用于此目的。并不像我希望的那样直截了当(不确定take_with_time在其他 Rx 实现中是否可用)。

mouse_long_press = mouse_down.select_many(lambda x:
    mouse_moves.take_with_time(1000).all(lambda z: distance(x, z) < 100)\
        .take_until(mouse_up.where(lambda y: x.button == y.button))
)

如果有人有更好的建议,我将暂时保留这个问题。

于 2015-04-22T06:06:50.660 回答
0

我会以不同的方式解决这个问题,方法是创建一个带有长度信息的鼠标按下流,并为超过 1 秒的按下进行过滤。


首先让我们假设您只有一个鼠标按钮。合并mouse_upmouse_down流并与操作员分配它们之间的时间间隔time_interval()。您将获得自上一个事件以来的间隔流以及事件本身。假设您的鼠标上下交替交替,这意味着您现在的事件是:

(down + time since last up), (up + time since last down), (down + time since last up) ...

现在,只需过滤x.value.type == "up" and x.interval > datetime.timedelta(seconds=1)

(您也可以使用 来验证这一点pairwise(),它始终为您提供当前 + 上一个事件,因此您可以检查前一个事件是否已关闭,当前事件是否已启动。)


、添加鼠标移动信息,使用window()操作符。

(这部分未经测试,我将关闭它应该如何表现的文档,但文档不是很清楚。所以 YMMV。)

这个想法是,您可以从一个可观察对象收集事件序列,并根据另一个可观察对象分成组。来自文档: observable窗口(window_openings)window_openings是合并的上/下流,或间隔流,以更方便者为准。然后您可以flat_map()(或select_many,这是同一件事)结果并以您喜欢的任何方式计算距离。

同样,您应该最终得到up/down 事件之间的距离流。然后,您可以zip()使用间隔流将此流,此时您可以过滤向上事件并获取时间和距离,直到前一个向下。


第三,如果您正在获取多个鼠标按钮的事件怎么办?

只需使用group_by()运算符拆分为每个按钮的流并按上述方式进行。

完整代码如下:

Event = collections.NamedTuple("Event", "event interval distance")

def sum_distance(move_stream):
    # put your distance calculation here; something like:
    return move_stream.pairwise().reduce(lambda acc, (a, b): acc + distance(a, b), 0)

def mouse_press(updown_stream):
    # shared stream for less duplication
    shared = updown_stream.share()
    intervals = shared.time_interval()  # element is: (interval=timedelta, value=original event)
    distances = mouse_move.window(shared).flat_map(sum_distance)
    zipped = intervals.zip(distances, lambda i, d: \
        Event(i.value, i.interval, d) )

mouse_long_press = (
    # merge the mouse streams
    rx.Observable.merge(mouse_up, mouse_down)
    # separate into streams for each button
    .group_by(lambda x: x.button)
    # create per-button event streams per above and merge results back
    .flat_map(mouse_press)
    # filter by event type and length
    .filter(lambda ev: ev.event.type == "up" and ev.interval >= datetime.timedelta(seconds=1)
)
于 2018-08-15T08:52:17.613 回答