2

我正在使用 RX 2.2.5。我有 2 个视图加载子订单

           _transportService
            .ObserveSubOrder(parentOrder.OrderId)
            .SubscribeOn(_backgroundScheduler)
            .ObserveOn(_uiScheduler)
            .Where(subOs => subOs != null)                
            .Snoop("BeforeGrpBy")
            .GroupBy(subOs => subOs.OrderId)
            .Subscribe(subOrdUpdates =>
            {
                AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));                        
            })

在 groupBy 获得所有元素序列之前,问题出现在 groupby 之后,它很少错过元素序列。我不认为它的并发问题从日志中可以明显看出。自定义 Snoop 扩展方法用于生成这些日志。

16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})

格式化时间:(线程):消息

正如您在 groupby onNext 被调用两次之前所看到的,但在它错过了一次之后。这里的 Rx 语法有问题还是已知问题?任何见解会有所帮助吗?如果需要进一步澄清,请发表评论。

更新: 添加工作/理想的日志:

16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})

更新 2:可能的错误或功能

GroupBy 仅在 fireNewMapEntry 为真(GroupBy.cs)时才会触发 groupedObservable 并且这发生在这里

 if (!_map.TryGetValue(key, out writer))
 {
    writer = new Subject<TElement>();
    _map.Add(key, writer);
    fireNewMapEntry = true;
  }

其中 _map 的类型为Dictionary<TKey, ISubject<TElement>>。这可能是问题吗?

4

2 回答 2

1

只是关于您的代码风格的一些注释(对不起,这并不是我认为@supertopia 已经回答的真正答案)

  1. 移动您SubscribeOnObserveOn呼吁成为您在最终订阅之前所做的最后一件事。在您当前的代码中,您正在执行Where, theSnoopGroupByall_uiScheduler占用宝贵的周期。

  2. 避免在订阅中订阅。似乎AddIfNew需要一个 key 和一个IObservable<T>,因此我假设它正在内部执行一些订阅。相反,依靠你所知道的。如果您使用 GroupBy,那么您知道第一次生成组时密钥将是唯一的。所以这现在可以只是一个 Add (如果它是您正在检查的键)。Take(1)如果您想明确,也可以使用。如果它不是您正在检查的键的值,那么它GroupBy似乎是多余的。

  3. 尽量使变量名称保持一致,以便在另一个开发人员阅读查询时很好地引导他们,而不是在subOs,childOs和之间跳转childUpdates, whenchildOrder似乎是一个更好的名称 (imo)

  4. 理想情况下,不要在可观察序列中返回空值。它的用途是什么?在极少数情况下这可能有意义,但我经常发现使用 null 而不是 anOnCompleted来表示此序列没有值。

例如

_transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .Where(childOrder => childOrder != null)                
        .Snoop("BeforeGrpBy")
        .GroupBy(childOrder => childOrder.OrderId)
        .SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder))
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(newGroup =>
        {
            Add(newGroup.Item1, newGroup.Item2);                        
        },
          ex=>//obviously we have error handling here ;-)
        );

或者

_transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .Where(childOrder => childOrder != null)                
        .Snoop("BeforeGrpBy")
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(childOrder =>
          {
             AddIfNew(childOrder.OrderId, childOrder);                             
          },
          ex=>//obviously we have error handling here ;-)
        );

甚至更好(没有窥探和空值检查)

var subscription = _transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(
          childOrder => AddIfNew(childOrder.OrderId, childOrder),
          ex=>//obviously we have error handling here ;-)
        );

hth

于 2016-10-27T09:39:21.767 回答
0

您缺少GroupBy.

运算符OnNext仅在出现新组后才发出(请参阅实现GroupBy.cs:67)。在您的情况下,orderID两个通知都相等,因此只OnNext发出一个。

IGroupedObservable<T>如果您需要访问组内的更多通知,则可以订阅操作员发出的值。

于 2016-10-27T06:56:23.683 回答