我想使用响应式扩展来转换一些消息并在一小段延迟后中继它们。
消息如下所示:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
输出看起来像这样:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
有几个要求:
- 延迟的长度取决于消息的内容。
- 每条消息都有一个 GroupId
- 如果新消息与等待传输的延迟消息具有相同的 GroupId,则应丢弃第一条消息,在新的延迟期后仅传输第二条消息。
给定一个 Observable<InMsg> 和一个 Send 函数:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
我知道我可以使用 Select 来执行转换。
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- 如何应用消息指定延迟?(请注意,这可能/应该导致消息的无序传递。)
- 如何对具有相同 GroupId 的消息进行重复数据删除?
- Rx 有能力解决这个问题吗?
- 有没有其他方法可以解决这个问题?