0

我想在另一个流中找到匹配项并将其与当前项目结合起来。

numbers = [1, 2, 3, 4, 5]
numbers_in_char = ["2", "1", "3", "5", "4"]

textnumbers_in_stream = rx.defer(rx.from_iterable(numbers_in_char))


def exists_in_words(number, words):
    return words.pipe(
        op.filter(lambda w: int(w) == number),
        op.map(lambda w: (number, w))
    )


rx \
    .from_iterable(numbers) \
    .pipe(op.map(lambda number: exists_in_words(number, textnumbers_in_stream))) \
    .subscribe(lambda row: print(row))

我希望打印这些:

(1,"1")
(2,"2")
(3,"3")
...

但是我有:

None
None
None
...

有人可以给我一个想法,我做错了什么?

提前谢谢了

4

3 回答 3

1

您在这里缺少退货声明:

def exists_in_words(number, words):
    return words.pipe(
        op.filter(lambda w: int(w) == number),
        op.map(lambda w: (number, w))
    )

如果你改变它来打印元组:

rx.from_iterable(numbers) \
    .pipe(op.map(lambda number: exists_in_words(number, textnumbers_in_stream))) \
    .subscribe(lambda row: print(row[0], row[1]))

输出是:

<rx.core.observable.observable.Observable object at 0x7f559b1df898> <rx.core.observable.observable.Observable object at 0x7f559b1dfba8>
<rx.core.observable.observable.Observable object at 0x7f559b1dfba8> <rx.core.observable.observable.Observable object at 0x7f559b1df198>
<rx.core.observable.observable.Observable object at 0x7f559b1df198> <rx.core.observable.observable.Observable object at 0x7f559b1df240>
<rx.core.observable.observable.Observable object at 0x7f559b1df240> <rx.core.observable.observable.Observable object at 0x7f559b1df898>
<rx.core.observable.observable.Observable object at 0x7f559b1df898> <rx.core.observable.observable.Observable object at 0x7f559b1dfba8>

要解决该字符串表示,请查看此问题

于 2022-01-23T12:16:03.773 回答
1
from rx import operators as op
import rx


def exists_in_words(number, words):
    return words.pipe(
        op.filter(lambda w: int(w) == number),
        op.map(lambda w: (number, w))
    )


def my_on_next(item):
    print(f"####### {item}", flush=True)


def my_on_error(throwable):
    print(throwable)


def my_on_completed():
    print('Done')


numbers = [1, 2, 3, 4, 5]
numbers_in_char = ["1", "2", "3", "4", "5"]

text_as_numbers_in_stream = rx.from_iterable(numbers_in_char)


def print_tuple(x):
    print(f"{x[0]}:{x[1]}", flush=True)
    return x


rx \
    .from_iterable(numbers) \
    .pipe(op.flat_map(lambda number: exists_in_words(number, text_as_numbers_in_stream))) \
    .subscribe(lambda x: my_on_next(x), my_on_error, my_on_completed)

我找到了..我换defer.from_iterable()第二个流..

我通过简单地反转过程来根据第二个流从第一个流中过滤一个项目:

def exists_in_words(number, words):
    return words.pipe(
        op.filter(lambda w: int(w) == number),
        op.map(lambda w: (number, w))
    )

稍后flatMap()过滤结果,这是一个 Observable。我是凭感觉做的,在java反应堆里做过很多次。基本上是删除它的外壳(观察者对象)并获取它的内容......

和tadaaaaaa ..它的工作原理。

####### (1, '1')
####### (2, '2')
####### (3, '3')
####### (4, '4')
####### (5, '5')
Done
于 2022-02-18T03:51:46.723 回答
0

如果要明智地组合两个可观察元素,可以使用zip运算符:

numbers = [1, 2, 3, 4, 5]
numbers_in_char = ["1", "2", "3", "4", "5"]


d = rx.from_iterable(numbers).pipe(
    ops.zip(rx.from_iterable(numbers_in_char)),
).subscribe(lambda row: print(row))

这将返回预期结果:

(1, '1')
(2, '2')
(3, '3')
(4, '4')
(5, '5')

请注意,当前版本的 RxPY (3.2) 有一个错误会阻止此示例工作。您可以使用 3.1.1 进行测试,或者希望在即将发布的版本中进行测试。

于 2022-01-25T17:54:14.183 回答