这就是我为完成这项工作所做的工作
var source = new Subject<Pair>();
// The Publish().RefCount() and Subscribe() are to make the sequence hot
//var changedSender = source.DistinctUntilChanged(p => p.Sender).Publish().RefCount();
var changedSender = source.Zip(source.Skip(1), (previous, next) => new { Previous = previous, Next = next }).DistinctUntilChanged(arg => arg.Next.Sender).Select(o => o.Previous).Publish().RefCount();
changedSender.Subscribe();
var either = changedSender.Merge(source.Throttle(TimeSpan.FromMilliseconds(300))); //.Do(o => Console.WriteLine("Do {0}", o.ToString()));
either.Subscribe(o => o.Print());
source.OnNext(new Pair("A", "i"));
System.Threading.Thread.Sleep(10);
source.OnNext(new Pair("A", "it"));
System.Threading.Thread.Sleep(10);
source.OnNext(new Pair("A", "bit"));
System.Threading.Thread.Sleep(500);
source.OnNext(new Pair("A", "bite"));
source.OnNext(new Pair("A", "bite1"));
source.OnNext(new Pair("A", "bite2"));
source.OnNext(new Pair("B", "a"));
source.OnNext(new Pair("B", "ani"));
source.OnNext(new Pair("B", "animal"));
System.Threading.Thread.Sleep(10);
source.OnNext(new Pair("A", "bitey"));
System.Threading.Thread.Sleep(500);
source.OnNext(new Pair("A", "at"));
source.OnNext(new Pair("B", "empty"));
source.OnNext(new Pair("A", "empty"));
source.OnNext(new Pair("C", "new"));
Console.ReadLine();