6

I need to process events coming from the EventLog. This is easy using the EventLogWatcher attaching to the EventRecordWritten event. However the query I am interested in (eventid == 299 || eventid == 500) provides more events than what I need. Here is an example of the stream

Event ID   Correlation ID
299        1AD...  (this is actually a guid)  X1
500        1AD...                             X2
500        1AD...
500        1AD...
299        43B...                             Y1
299        EDB...                             Z1
500        43B...                             Y2
500        EDB...                             Z2
500        43B...
500        43B...

I am interested in the event 299 and the first event 500 that matches the correlation id of the 299 event. I marked them in the stream above, that's the output collection I am interested in: [X1, X2, Y1, Y2, Z1, Z2] which probably is a dictionary using the correlation id as a key and a tuple of EventRecord as the value

{ 1AD.., <X1, X2> }
{ 43B.., <Y1, Y2> }
{ EDB.., <Z1, Z2> }

In general events might come in order (299 and then 500) but in a high concurrency situation I foresee two 299 events coming together and then the 500 events so I don't want to rely on the order the events come. The correlation id is the key to correlate them (which is the first property of the event eventRecord.Properties[0])

I think this can be solved with a state machine but it would be interesting to see if anyone comes up with a solution with Rx represented by a query to an observable. That would keep the pattern matching logic in a single place.

UPDATE: here is the answer to the solution. Thanks Gideon it was exactly the starting point I needed!

var pairs = events
            .Where(e299 => e299.EventArgs.EventRecord.Id == 299)
            .SelectMany(e299 => events.Where(e500 => e500.EventArgs.EventRecord.Id == 500 &&
                                                    e299.EventArgs.EventRecord.Properties[0].Value.ToString() ==
                                                    e500.EventArgs.EventRecord.Properties[0].Value.ToString())
                                    .Take(1), 
                        (e299, e500) => new { First = e299, Second = e500 });

Thanks in advance, Matias

4

3 回答 3

5

If the 299 events always come before the 500 events, SelectMany and Where should be enough.

var events; //declared somewhere

var pairs = from e299 in events
            where e299.eventid == 299
            from e500 in events
            where e500.eventid == 500 &&
                  e299.Correlation == e500.Correlation
            select new with {Correlation = e299.Correlation,
                             First = e299,
                             Second = e500}

If your source has multiple 500 events for each correlated 299 event, you may need to switch to lambda syntax and add a Take(1) to the second subscription.

于 2012-04-11T17:42:46.997 回答
2

You should look at using Observable.When, with Joins. This will allow you to compose complex join patterns like you desire.

For example see, https://stackoverflow.com/a/3868608/13131 and Guide to System.Reactive.Joins.

于 2012-04-11T20:16:16.063 回答
0

I don't know what you mean with "a state machine".

But you could write an observer that collects (eventid == 299 || eventid == 500). Using throttle strategy (collect the lastest 1000 events, or the latest minute, I don't know), it raise an event when it detects a pair of events 299, 500 have the same guid, removing then from its internal dictionary/list of events.

Then, that kind observer IS THE observable for the rest of your system.

Maybe a list could be collect the events, and when a new event is detected, something like

events.Where(e => e.eventid == newevent.eventid && ((e.id == 299 && newev.id == 500) || (e.id == 500 && newev.id == 299))

could be enough in performance, instead of having a dictionary by guid.

See you later, Mathew!

于 2012-04-11T17:14:51.777 回答