0

我正在模拟一些需要很长时间的输入的计算。每当完成计算(即on_next发出)时,我希望将结果反应性地附加到结果数据帧并在on_completed发出时打印最终的 DF。但是,DF 是空的,为什么没有值累积?

这适用于 Python 3.9.9 和 rxpy 3.2.0。

import time
from random import random, randint
from rx import create
import pandas as pd
import rx
print(rx.__version__)

def accumulate(result, i):
    # Here the values should accumulate !?
    result = result.append(pd.DataFrame({'a': [i]}))

def average_df(observer, scheduler):
    for pid in pids:
        time.sleep(random()*0.8)
        observer.on_next(randint(0, 100))
    observer.on_completed()

def print_result(result):
    print(result)

# Client
if __name__ == "__main__":
    result = pd.DataFrame({'a': []})

    # Observable
    pids = [1, 2, 3, 4]
    source = create(average_df)
    
    source.subscribe(
        on_next = lambda i: accumulate(result, i),
        on_error = lambda e: print("Error: {0}".format(e)),
        on_completed = lambda: print_result(result)
    )
4

1 回答 1

0

pandas append 函数返回一个新对象。在累积中,这个新对象被设置为局部变量result。这不会更新主块中的结果变量。

借助扫描运算符,您可以通过返回每一步的累积值来获得最终数据帧:

import time
from random import random, randint
import rx
import rx.operators as ops
import pandas as pd
import rx
print(rx.__version__)

def accumulate():
    def _accumulate(acc, I):
        return acc.append(pd.DataFrame({'a': [i]}))

    return  ops.scan(_accumulate, seed=pd.DataFrame({'a': []}))

def average_df(observer, scheduler):
    for pid in pids:
        time.sleep(random()*0.8)
        observer.on_next(randint(0, 100))
    observer.on_completed()


# Client
if __name__ == "__main__":
    result = pd.DataFrame({'a': []})

    # Observable
    pids = [1, 2, 3, 4]
    source = rx.create(average_df)

    df = source.pipe(
        accumulate(),
        ops.last(),
    ).run()
print(df)

另外为了更简单,你可以使用rxsci库的to_pandas操作符(免责声明,我是作者)。这是一个使用 rxsci 的解决方案,以及一种更被动的方式来创建源 observable:

from typing import NamedTuple
import time
from random import random, randint
import rx
import rx.operators as ops
import pandas as pd
import rxsci as rs
print(rx.__version__)


class Item(NamedTuple):
    pid: int
        

if __name__ == "__main__":
    result = pd.DataFrame({'a': []})

    # Observable
    pids = [1, 2, 3, 4]
    source = rx.from_(pids).pipe(
        ops.do_action(lambda i: time.sleep(random()*0.8)),
        ops.map(lambda i: Item(pid=randint(0, 100))),
    )
    
    df = source.pipe(
        rs.ops.to_pandas(),
    ).run()
    
print(df)
于 2022-01-24T11:35:43.910 回答