1

我是所有反应式编程概念的新手,我需要处理以下情况——我有一个运行时间相对较长的消费者逻辑,在这种情况下,可能会在前面的消息仍然存在时产生几条消息正在处理。

我需要跳过这些消息并防止为这些消息调用相关的消费逻辑。这是一个示例(在 C# 中)

IObservable<int> stream  = ...
stream.SubscribeOn(TaskPoolScheduler.Default).Subsribe(ProcessMessage, cancellationToken);

假设每 5 毫秒推送一条消息到流中: (1, 2, 3, 4, 5)

ProcessMessage方法可能需要大约 10-15ms 才能完成,这意味着当它收到消息时,1它仍然会在生产时工作。23

我需要跳过调用ProcessMessage并直接赶上2,或者下一个未处理的消息将是什么。34

Reactive 扩展中是否有任何内置结构可以让我处理这种特殊情况?

4

0 回答 0