我想考虑基于有效负载的最新事件,即 StockTicksData id 并忽略时间窗口内的任何重复项
有效载荷和查询
var StockTicksData = new[]
{
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 01), Price = 100, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 02), Price = 200, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 03), Price = 3000, ID = "000361105" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 04), Price = 100, ID = "001055102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 05), Price = 700, ID = "001084102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 06), Price = 500, ID = "001084102" },
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 07), Price = 100, ID = "001084102" },
};
var stocks = StockTicksData.ToPointStream(Application, t =>
PointEvent.CreateInsert(t.Timestamp, t),
AdvanceTimeSettings.IncreasingStartTime);
var query = (from e in stocks
group e by e.ID into ipGroup
from win in ipGroup.TumblingWindow(TimeSpan.FromSeconds(2),
HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
CusipID = cusipGroup.Key,
Timestamp = win.Max(e => e.Timestamp),
Price = 0
});
var cusipIdGroupCepStream = (from px in query
join lz in stocks
on new { px.CusipID, px.Timestamp }
equals new { lz.CusipID, lz.Timestamp }
select new
{
CusipId = lz.CusipID,
Price = lz.Price,
TimeofArrival = lz.Timestamp
});
上面的查询工作正常,但是当我使用输入适配器时,我必须插入一个 cti 事件来刷新输出。这是代码
Ticks Generator 它的到达时间为
priceTick.TimeofArrival = DateTime.Now.AddTicks(1);
输入适配器
.... In a loop
{
currEvent = CreateInsertEvent();
currEvent.StartTime = priceTick.TimeofArrival; **// each event has time arrival from input which is t+1**
currEvent.Payload = new PriceTick { Id = priceTick.Id, Price = priceTick.Price, TimeofArrival = priceTick.TimeofArrival };
pendingEvent = null;
Enqueue(ref currEvent);
// Also send an CTI event
EnqueueCtiEvent(priceTick.TimeofArrival.AddTicks(1)); **// Added to flush the output**
}
相同的查询没有给出带有输入适配器和 cti 事件的预期输出
任何帮助将不胜感激。