2

我有两个Subjects,一个是带有 ID 的人员对象流,另一个是代表谁与谁交朋友的 ID 的外部参照流。这是一些简化的代码。

class Program {
    static void Main() {
        // Set up observables
        var people = new Subject<Person>();
        var friendMap = new Subject<FriendMap>();
        var friendNotices = from p1 in people
                            from p2 in people
                            from pair in friendMap
                            where p1.Id == pair.Id1 && p2.Id == pair.Id2
                            select p1.Name + " befriended " + p2.Name;

        // Subscribe to log
        friendNotices.Subscribe(Console.WriteLine);

        // Add people
        people.OnNext(new Person(1, "Alice"));
        people.OnNext(new Person(2, "Bob"));

        // Add relationships
        friendMap.OnNext(new FriendMap(1, 2)); // => "Alice befriended Bob"
        friendMap.OnNext(new FriendMap(2, 1)); // Doesn't show up!
    }
}

class Person {
    public Person(int id, string name) {
        Id = id;
        Name = name;
    }
    public string Name;
    public int Id;
}

class FriendMap {
    public FriendMap(int id1, int id2) {
        Id1 = id1;
        Id2 = id2;
    }
    public int Id1;
    public int Id2;
}

我遇到的问题是有时添加外部参照不会导致friendNotice事件。特别是,如果 Id2 的人是在Id1 的人之前创建的,它似乎会失败。

这是 Rx 中的错误还是我的代码中的错误?无论哪种方式,我该如何让它发挥作用?

(“交友”在我的应用程序中是不可交换的——爱丽丝与鲍勃交朋友与鲍勃与爱丽丝交朋友的关系不同,因此“只需交换 ID 并重试”在我的情况下不是一个可用的解决方案)。

4

1 回答 1

3

这里的问题是对 SelectMany 函数如何工作的误解(这是由 linq comprehension 映射到的运算符from ... from ...)。

此构造从源流中获取每个元素并将其投影到目标流上。它通过在目标流上为源的每个元素创建一个订阅来为投影提供服务来实现这一点。

让我们使用一些伪代码来检查一下。只考虑查询的这一部分:

from p1 in people
from p2 in people

目前,让我们将人员流拆分为人员 A 和人员 B:

from p1 in peopleA
from p2 in peopleB

如果我们现在调用:

peopleA.OnNext(Alice);

实际发生的是,将在 peopleB 上创建一个新订阅,以服务 Alice 到 peopleB 的投影。此时,peopleB 中没有元素——因此不会发生投影。

现在如果我们打电话:

peopleB.OnNext(Tom);

Alice -> peopleB 的投影将运行并输出 (Alice, Tom)。

现在调用:

peopleB.OnNext(Dick);

现在来自 Alice -> peopleB 的投影继续,因此 (Alice, Dick) 将被输出。

现在调用:

peopleA.OnNext(Bob);

现在在 peopleB 上为 Bob 开始了一个新的订阅 - 但在 peopleB 发出之前不会输出任何内容。

现在调用:

peopleB.OnNext(Harry);

随着 Alice 和 Bob 订阅都运行,我们将得到 (Alice, Harry) 和 (Bob, Harry);

一个简单的例子供您尝试:

var source = new Subject<string>();
var source2 = new Subject<string>();

var res = from s in source
          from t in source2
          select s + " " + t;

res.Subscribe(Console.WriteLine);

source.OnNext("A"); 
source2.OnNext("1");
source2.OnNext("2");
source.OnNext("B");
source2.OnNext("3");

将给出输出:

A 1
A 2
A 3
B 3

回到“self SelectMany”。现在一切都开始变得有点棘手。关键部分是正在设置的订阅不会捕获触发其设置的“当前”项目。因此,让我们标记 SelectMany A 和 B 的每个部分:

from p1 in people (call this A)
from p2 in people (call this B)

当我们调用时:

people.OnNext(Alice);

A 上的 Alice 的订阅将在 B 上进行 - 但由于它是在 Alice 发出后进行的,所以它不会捕获她,也不会输出任何内容。

现在我们调用:

people.OnNext(Bob);

Alice 的订阅将在 B 上看到 Bob,这会导致 (Alice, Bob) 被输出。将在 B 上创建 A 上 Bob 的订阅,但由于错过了 Bob 的输出,因此不会再输出任何内容。

这就是你所看到的。发出的唯一组合是 (Alice, Bob)。

不过,您可以通过在设置新订阅时让主题对象重播其内容来解决此问题。像这样修改示例的第一部分:

    // Set up observables
    var people = new Subject<Person>();
    var peopleR = people.Replay().RefCount();
    var friendMap = new Subject<FriendMap>();
    var friendNotices = from p1 in peopleR
                        from p2 in peopleR
                        from pair in friendMap
                        where p1.Id == pair.Id1 && p2.Id == pair.Id2
                        select p1.Name + " befriended " + p2.Name;

...但是如果您的流长时间运行当然不建议这样做,因为您将所有内容都缓存在内存中。

就您的特定问题而言,我认为您最好采用不同的方式-但是如果不知道您要达到的目标就很难进行规定,而且这已经是一篇很长的帖子了!一种方法是简单地订阅 FriendMap 条目,然后只需查找朋友的姓名即可输出您想要的消息。(我可能在大局中遗漏了一些东西!)。

希望您了解您的方法的问题。

于 2013-06-07T16:03:45.387 回答