我正在使用 .NET 响应式构建生产者..消费者模式。生产者从 Kafka 消息总线读取消息。一旦消息被读取,就需要交给消费者来处理消息。
我能够使用 .NET 反应式扩展(可观察和观察)来做到这一点。但是,我想处理从总线读取消息速度更快而消费者落后的情况。我希望 observable 对背压做出反应,即如果 observable 表明它仍在处理较早的消息,请减慢 observable。
// Create a subject of my custom object and make it observable
private readonly Subject<MyMessage> messageSubject = new Subject<MyMessage>();
messageSubject.AsObservable();
// add observer to the subject
_onMessageObservable.ObserveOn(NewThreadScheduler.Default)
// On receive of message, notify the observer
messageSubject.OnNext(msg.Value);