@Enigmativity 的答案的一种扩展。我用它来解决问题:
public static IObservable<(Action ready, IReadOnlyList<T> values)> BufferUntilReady<T>(this IObservable<T> stream)
{
var gate = new BehaviorSubject<Guid>(Guid.NewGuid());
void Ready() => gate.OnNext(Guid.NewGuid());
return stream.Publish(shared => shared
.Buffer(gate.CombineLatest(shared, ValueTuple.Create)
.DistinctUntilChanged(new AnyEqualityComparer<Guid, T>()))
.Where(x => x.Any())
.Select(x => ((Action) Ready, (IReadOnlyList<T>) x)));
}
public class AnyEqualityComparer<T1, T2> : IEqualityComparer<(T1 a, T2 b)>
{
public bool Equals((T1 a, T2 b) x, (T1 a, T2 b) y) => Equals(x.a, y.a) || Equals(x.b, y.b);
public int GetHashCode((T1 a, T2 b) obj) => throw new NotSupportedException();
}
订阅者接收到准备好接收下一个缓冲区时调用的 Ready() 函数。我没有在同一个线程上观察每个缓冲区以避免循环,但我想你可以在其他地方打破它,如果你需要在同一个线程上处理每个缓冲区。