0

使用 Python 中的 ReactiveX,我如何总结 Observables 流?

我有一个字典流是 {"user": "...", "date": ...}。我想创建一个函数,我可以应用该函数为每个用户累积具有最新日期的字典,然后在流结束时发出累积的 observables(它就像 max,但必须查看用户字段,并且会发出多个值)。

示例 - 输入流:

{ "user": "a", "date": "2017-02-14" }
{ "user": "b", "date": "2016-01-01" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "a", "date": "2017-01-01" }
{ "user": "b", "date": "2017-01-01" }

预期输出(顺序无关紧要)

{ "user": "a", "date": "2017-02-14" }
{ "user": "c", "date": "2015-01-01" }
{ "user": "b", "date": "2017-01-01" }

我在https://ninmesara.github.io/RxPY/api/operators/index.html阅读了“Filtering Observables”、“Transforming Observables”、“Combining Observables”和“Decision Tree of Observable Operators” ,并查看了 reduce /aggregate(仅在末尾发出单个值)和 flat_map(不知道如何检测流的结尾)。many_select 和 window (尤其是)看起来很有希望,但我很难理解它们。

我如何使用 rx 来做到这一点(通过使用现有的运算符之一,或者通过制作自定义运算符 [我还不知道该怎么做]?)

4

2 回答 2

0

我认为以下内容可能会满足您的要求。

import rx

rx.Observable.from_([
    { "user": "a", "date": "2017-02-14" },
    { "user": "b", "date": "2016-01-01" },
    { "user": "c", "date": "2015-01-01" },
    { "user": "a", "date": "2017-01-01" },
    { "user": "b", "date": "2017-01-01" }]) \
        .group_by(lambda x: x['user']) \
        .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
        .subscribe(print)
于 2017-02-17T15:12:53.870 回答
0

汉斯的答案很接近,只需要稍作调整。

我的观察者希望得到{ 'user': ..., 'date': }字典:

import rx

def pp1(x):
    print type(x), x

rx.Observable.from_([
    { "user": "a", "date": "2017-02-14" },
    { "user": "b", "date": "2016-01-01" },
    { "user": "c", "date": "2015-01-01" },
    { "user": "a", "date": "2017-01-01" },
    { "user": "b", "date": "2017-01-01" }]) \
        .map(lambda x: x[0]) \
        .subscribe(pp1)

产量

<type 'dict'> {'date': '2017-02-14', 'user': 'a'}
<type 'dict'> {'date': '2016-01-01', 'user': 'b'}
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}
<type 'dict'> {'date': '2017-01-01', 'user': 'a'}
<type 'dict'> {'date': '2017-01-01', 'user': 'b'}

执行 .group_by 和 .flat_map 会导致观察者最后获得包含摘要的长度为 1 的列表,而不仅仅是摘要。

import rx

def pp1(x):
    print type(x), x

rx.Observable.from_([
    { "user": "a", "date": "2017-02-14" },
    { "user": "b", "date": "2016-01-01" },
    { "user": "c", "date": "2015-01-01" },
    { "user": "a", "date": "2017-01-01" },
    { "user": "b", "date": "2017-01-01" }]) \
        .group_by(lambda x: x['user']) \
        .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
        .subscribe(pp1)

产量

<type 'list'> [{'date': '2017-02-14', 'user': 'a'}]
<type 'list'> [{'date': '2017-01-01', 'user': 'b'}]
<type 'list'> [{'date': '2015-01-01', 'user': 'c'}]

需要添加地图:

import rx

def pp1(x):
    print type(x), x

rx.Observable.from_([
    { "user": "a", "date": "2017-02-14" },
    { "user": "b", "date": "2016-01-01" },
    { "user": "c", "date": "2015-01-01" },
    { "user": "a", "date": "2017-01-01" },
    { "user": "b", "date": "2017-01-01" }]) \
        .group_by(lambda x: x['user']) \
        .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \
        .map(lambda x: x[0]) \
        .subscribe(pp1)

这产生了预期的

<type 'dict'> {'date': '2017-02-14', 'user': 'a'}
<type 'dict'> {'date': '2017-01-01', 'user': 'b'}
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}
于 2017-02-17T21:41:26.763 回答