我正在尝试将一些 ReactiveX 概念集成到现有项目中,认为这可能是一种很好的做法,也是一种使某些任务更清洁的方法。
我打开一个文件,从它的行创建一个 Observable,然后进行一些过滤,直到我得到我想要的行。现在,我想使用 re.search() 从其中两行中提取一些信息以返回特定组。我一生都无法弄清楚如何从 Observable 中获取这些值(不将它们分配给全局变量)。
train = 'ChooChoo'
with open(some_file) as fd:
line_stream = Observable.from_(fd.readlines())
a_stream = line_stream.skip_while(
# Begin at dictionary
lambda x: 'config = {' not in x
).skip_while(
# Begin at train key
lambda x: "'" + train.lower() + "'" not in x
).take_while(
# End at closing brace of dict value
lambda x: '}' not in x
).filter(
# Filter sdk and clang lines only
lambda x: "'sdk'" in x or "'clang'" in x
).subscribe(lambda x: match_some_regex(x))
代替.subscribe()
该流的末尾,我尝试使用.to_list()
来获取一个列表,我可以在该列表上迭代“正常方式”,但它只返回一个类型的值:
<class 'rx.anonymousobservable.AnonymousObservable'>
我在这里做错了什么?
我见过的每个 Rx 示例都只打印结果。如果我希望它们在我可以同步使用的数据结构中怎么办?