10

我想使用响应式扩展来转换一些消息并在一小段延迟后中继它们。

消息如下所示:

class InMsg
{
   int GroupId { get; set; }
   int Delay { get; set; }
   string Content { get; set; }
}

输出看起来像这样:

class OutMsg
{ 
   int GroupId { get; set; }
   string Content { get; set; }
   OutMsg(InMsg in)
   {
       GroupId = in.GroupId;
       Content = Transform(in.Content);  // function omitted
   }
}

有几个要求:

  • 延迟的长度取决于消息的内容。
  • 每条消息都有一个 GroupId
  • 如果新消息与等待传输的延迟消息具有相同的 GroupId,则应丢弃第一条消息,在新的延迟期后仅传输第二条消息。

给定一个 Observable<InMsg> 和一个 Send 函数:

IObservable<InMsg> inMsgs = ...;

void Send(OutMsg o)
{
     ... // publishes transformed messages
}

我知道我可以使用 Select 来执行转换。

void SetUp()
{
     inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
  • 如何应用消息指定延迟?(请注意,这可能/应该导致消息的无序传递。)
  • 如何对具有相同 GroupId 的消息进行重复数据删除?
  • Rx 有能力解决这个问题吗?
  • 有没有其他方法可以解决这个问题?
4

2 回答 2

9

您可以使用GroupBy,IGroupedObservableDelay延迟输出,并Switch确保较新的值替换其组中的先前值:

IObservable<InMsg> inMessages;

inMessages
    .GroupBy(msg => msg.GroupId)
    .Select(group =>
        {
            return group.Select(groupMsg => 
                {
                    TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
                    OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here

                    return Observable.Return(outMsg).Delay(delay);
                })
                .Switch();
        })
        .Subscribe(outMsg => Console.Write("OutMsg received"));

实现注意事项:如果一个分组值在消息发送后(即延迟之后)到达它将开始一个新的延迟

于 2011-01-19T17:09:51.680 回答
2

@Richard Szalay 的回答几乎对我有用(在 .NET Framework 4.6 上使用 .NET Rx 3.1.1),但我必须添加.Merge()到表达式的末尾以组合IObservable<IObservable<OutMsg>>结果,如下所示:

对我来说(在 .NET Framework 4.6 上使用 .NET Rx 3.1.1)修复是添加.Merge()到最后,如下所示:

var deduplicated = inputs
    .GroupBy(input => input)
    .Select(group =>
        group
        .Select(input => Observable.Return(input).Delay(TimeSpan.FromSeconds(5)))
        .Switch())
    .Merge(); // <-- This is added to combine the partitioned results
于 2018-04-02T01:24:09.537 回答