我是所有反应式编程概念的新手,我需要处理以下情况——我有一个运行时间相对较长的消费者逻辑,在这种情况下,可能会在前面的消息仍然存在时产生几条消息正在处理。
我需要跳过这些消息并防止为这些消息调用相关的消费逻辑。这是一个示例(在 C# 中)
IObservable<int> stream = ...
stream.SubscribeOn(TaskPoolScheduler.Default).Subsribe(ProcessMessage, cancellationToken);
假设每 5 毫秒推送一条消息到流中: (1, 2, 3, 4, 5)
该ProcessMessage
方法可能需要大约 10-15ms 才能完成,这意味着当它收到消息时,1
它仍然会在生产时工作。2
3
我需要跳过调用ProcessMessage
并直接赶上2
,或者下一个未处理的消息将是什么。3
4
Reactive 扩展中是否有任何内置结构可以让我处理这种特殊情况?