0

我正在尝试将一些 ReactiveX 概念集成到现有项目中,认为这可能是一种很好的做法,也是一种使某些任务更清洁的方法。

我打开一个文件,从它的行创建一个 Observable,然后进行一些过滤,直到我得到我想要的行。现在,我想使用 re.search() 从其中两行中提取一些信息以返回特定组。我一生都无法弄清楚如何从 Observable 中获取这些值(不将它们分配给全局变量)。

train = 'ChooChoo'

with open(some_file) as fd:
    line_stream = Observable.from_(fd.readlines())

a_stream = line_stream.skip_while(
        # Begin at dictionary
        lambda x: 'config = {' not in x
    ).skip_while(
        # Begin at train key
        lambda x: "'" + train.lower() + "'" not in x
    ).take_while(
        # End at closing brace of dict value
        lambda x: '}' not in x
    ).filter(
        # Filter sdk and clang lines only
        lambda x: "'sdk'" in x or "'clang'" in x
    ).subscribe(lambda x: match_some_regex(x))

代替.subscribe()该流的末尾,我尝试使用.to_list()来获取一个列表,我可以在该列表上迭代“正常方式”,但它只返回一个类型的值:

<class 'rx.anonymousobservable.AnonymousObservable'>

我在这里做错了什么?

我见过的每个 Rx 示例都只打印结果。如果我希望它们在我可以同步使用的数据结构中怎么办?

4

1 回答 1

0

在短期内,我使用 itertools 实现了我想要的功能(正如@jonrsharpe 所建议的那样)。这个问题仍然困扰着我,所以我今天回过头来想通了。

这不是 Rx 的一个很好的例子,因为它只使用一个线程,但至少现在我知道如何在需要时突破“monad”。

#!/usr/bin/env python

from __future__ import print_function
from rx import *

def my_on_next(item):
    print(item, end="", flush=True)

def my_on_error(throwable):
    print(throwable)

def my_on_completed():
    print('Done')
    pass

def main():
    foo = []

    # Create an observable from a list of numbers
    a = Observable.from_([14, 9, 5, 2, 10, 13, 4])

    # Keep only the even numbers
    b = a.filter(lambda x: x % 2 == 0)

    # For every item, call a function that appends the item to a local list
    c = b.map(lambda x: foo.append(x))
    c.subscribe(lambda x: x, my_on_error, my_on_completed)

    # Use the list outside the monad!
    print(foo)

if __name__ == "__main__":
    main()

这个例子相当做作,所有的中间可观察对象都不是必需的,但它表明你可以轻松地做我最初描述的事情。

于 2016-05-22T02:28:51.197 回答