重要提示:有关结果的描述和更多详细信息,请查看我的回答
我需要对通常被复制的一系列对象/事件进行分组和过滤,并用 TimeSpan 间隔对其进行缓冲。我尝试用一些大理石图更好地解释它:
X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z
会产生
X---Y---Z---X---Y---Z
其中 X、Y 和 Z 是不同的事件类型,“---”表示间隔。此外,我还想通过一个关键属性来区分它在所有类型上都可用,因为它们有一个共同的基类:
X, Y, Z : A
A 包含一个属性 Key。使用符号 Xa 表示 X.Key = a,最终样本将是:
X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c
会产生
X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b
任何人都可以帮助我将所需的 Linq 运算符(可能是 DistinctUntilChanged 和 Buffer)放在一起来实现这种行为吗?谢谢
更新 18.08.12:
根据要求,我尝试给出更好的解释。我们有设备收集事件并将其发送到 Web 服务。这些设备具有旧逻辑(由于向后兼容性,我们无法更改它)并且它们不断发送事件直到收到确认;确认后,他们发送队列中的下一个事件,依此类推。事件包含单元的网络地址和一些其他属性,用于区分每个设备的队列中的事件。一个事件如下所示:
class Event
{
    public string NetworkAddress { get; }
    public string EventCode { get; }
    public string AdditionalAttribute { get; }
}
目标是每 5 秒处理一次从所有设备接收到的可区分事件,将信息存储在数据库中(这就是我们不想分批执行此操作的原因)并将 ack 发送到设备。让我们举一个只有两个设备和一些事件的例子:
Device 'a':
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x'
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y'
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x'
Device 'b':
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y'
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x'
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y'
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x'
Pn are the operations done by our server, explained later
可能的弹珠图(输入流+输出流):
Device 'a'          : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-...
Device 'b'          : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-...
Time                : ------------[1s]-----------[2s]------------[3s]------------[4s]-
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]-
P1: Server stores and acknowledges [a1] and [b1]
P2: "      "      "   "            [b2]
P3: "      "      "   "            [a2] and [b3]
P4: "      "      "   "            [a3] and [b4]
最后我认为这可能是基本运算符的简单组合,但我是 Rx 新手,我有点困惑,因为似乎有很多运算符(或运算符组合)来获得相同的输出流.
更新 19.08.12:
请记住,此代码在服务器上运行,它应该运行数天而不会发生内存泄漏......我不确定受试者的行为。目前,对于每个事件,我都会在服务上调用推送操作,该服务调用主题的 OnNext,我应该在其上构建查询(如果我对主题的使用没有错的话)。
20.08.12 更新:
当前实施,包括验证测试;这是我尝试过的,@yamen 的建议似乎相同
public interface IEventService
{
    // Persists the events
    void Add(IEnumerable<Event> events);
}
public class Event
{
    public string Description { get; set; }
}
/// <summary>
/// Implements the logic to handle events.
/// </summary>
public class EventManager : IDisposable
{
    private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5);
    private readonly Subject<EventMessage> subject = new Subject<EventMessage>();
    private readonly IDisposable subscription;
    private readonly object locker = new object();
    private readonly IEventService eventService;
    /// <summary>
    /// Initializes a new instance of the <see cref="EventManager"/> class.
    /// </summary>
    /// <param name="scheduler">The scheduler.</param>
    public EventManager(IEventService eventService, IScheduler scheduler)
    {
        this.eventService = eventService;
        this.subscription = this.CreateQuery(scheduler);
    }
    /// <summary>
    /// Pushes the event.
    /// </summary>
    /// <param name="eventMessage">The event message.</param>
    public void PushEvent(EventMessage eventMessage)
    {
        Contract.Requires(eventMessage != null);
        this.subject.OnNext(eventMessage);
    }
    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    /// <filterpriority>2</filterpriority>
    public void Dispose()
    {
        this.Dispose(true);
    }
    private void Dispose(bool disposing)
    {
        if (disposing)
        {
            // Dispose unmanaged resources
        }
        this.subject.Dispose();
        this.subscription.Dispose();
    }
    private IDisposable CreateQuery(IScheduler scheduler)
    {
        var buffered = this.subject
            .DistinctUntilChanged(new EventComparer())
            .Buffer(EventHandlingPeriod, scheduler);
        var query = buffered
            .Subscribe(this.HandleEvents);
        return query;
    }
    private void HandleEvents(IList<EventMessage> eventMessages)
    {
        Contract.Requires(eventMessages != null);
        var events = eventMessages.Select(this.SelectEvent);
        this.eventService.Add(events);
    }
    private Event SelectEvent(EventMessage message)
    {
        return new Event { Description = "evaluated description" };
    }
    private class EventComparer : IEqualityComparer<EventMessage>
    {
        public bool Equals(EventMessage x, EventMessage y)
        {
            return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
        }
        public int GetHashCode(EventMessage obj)
        {
            var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
            return s.GetHashCode();
        }
    }
}
public class EventMessage
{
    public string NetworkAddress { get; set; }
    public byte EventCode { get; set; }
    public byte Attribute { get; set; }
    // Other properties
}
和测试:
public void PushEventTest()
    {
        const string Address1 = "A:2.1.1";
        const string Address2 = "A:2.1.2";
        var eventServiceMock = new Mock<IEventService>();
        var scheduler = new TestScheduler();
        var target = new EventManager(eventServiceMock.Object, scheduler);
        var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
        var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
        scheduler.Schedule(() => target.PushEvent(eventMessageA1));
        scheduler.Schedule(TimeSpan.FromSeconds(1), () => target.PushEvent(eventMessageB1));
        scheduler.Schedule(TimeSpan.FromSeconds(2), () => target.PushEvent(eventMessageA1));
        scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);
        eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once());
        scheduler.Schedule(TimeSpan.FromSeconds(3), () => target.PushEvent(eventMessageB1));
        scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks);
        eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once());
    }
此外,我再次指出,该软件可以无问题地运行数天,处理数千条消息,这一点非常重要。说清楚:测试没有通过当前的实现。