我正在模拟一些需要很长时间的输入的计算。每当完成计算(即on_next
发出)时,我希望将结果反应性地附加到结果数据帧并在on_completed
发出时打印最终的 DF。但是,DF 是空的,为什么没有值累积?
这适用于 Python 3.9.9 和 rxpy 3.2.0。
import time
from random import random, randint
from rx import create
import pandas as pd
import rx
print(rx.__version__)
def accumulate(result, i):
# Here the values should accumulate !?
result = result.append(pd.DataFrame({'a': [i]}))
def average_df(observer, scheduler):
for pid in pids:
time.sleep(random()*0.8)
observer.on_next(randint(0, 100))
observer.on_completed()
def print_result(result):
print(result)
# Client
if __name__ == "__main__":
result = pd.DataFrame({'a': []})
# Observable
pids = [1, 2, 3, 4]
source = create(average_df)
source.subscribe(
on_next = lambda i: accumulate(result, i),
on_error = lambda e: print("Error: {0}".format(e)),
on_completed = lambda: print_result(result)
)