0

我想考虑基于有效负载的最新事件,即 StockTicksData id 并忽略时间窗口内的任何重复项

有效载荷和查询

var StockTicksData = new[]
{
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 01), Price = 100, ID = "000361105"    }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 02), Price = 200, ID = "000361105" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 03), Price = 3000, ID = "000361105" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 04), Price = 100, ID = "001055102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 05), Price = 700, ID = "001084102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 06), Price = 500, ID = "001084102" }, 
new { Timestamp = new DateTime(2012, 5, 9, 1, 00, 07), Price = 100, ID = "001084102" }, 
};
var stocks = StockTicksData.ToPointStream(Application, t =>
PointEvent.CreateInsert(t.Timestamp, t),
AdvanceTimeSettings.IncreasingStartTime);

var query = (from e in stocks
        group e by e.ID into ipGroup
        from win in ipGroup.TumblingWindow(TimeSpan.FromSeconds(2), 
            HoppingWindowOutputPolicy.ClipToWindowEnd)
        select new 
        {   
             CusipID = cusipGroup.Key,
             Timestamp = win.Max(e => e.Timestamp),
             Price = 0
        });

var cusipIdGroupCepStream = (from px in query
                         join lz in stocks
                         on new { px.CusipID, px.Timestamp }
                         equals new { lz.CusipID, lz.Timestamp }
                         select new 
                           {
                              CusipId = lz.CusipID,
                              Price = lz.Price,
                              TimeofArrival = lz.Timestamp
                           });

上面的查询工作正常,但是当我使用输入适配器时,我必须插入一个 cti 事件来刷新输出。这是代码

Ticks Generator 它的到达时间为

priceTick.TimeofArrival = DateTime.Now.AddTicks(1);

输入适配器

.... In a loop
{
currEvent = CreateInsertEvent();
currEvent.StartTime = priceTick.TimeofArrival; **// each event has time arrival from input which is t+1**
currEvent.Payload = new PriceTick { Id = priceTick.Id, Price = priceTick.Price, TimeofArrival = priceTick.TimeofArrival };
  pendingEvent = null;

 Enqueue(ref currEvent);

 // Also send an CTI event
 EnqueueCtiEvent(priceTick.TimeofArrival.AddTicks(1)); **// Added to flush the output**
 }

相同的查询没有给出带有输入适配器和 cti 事件的预期输出

任何帮助将不胜感激。

4

1 回答 1

0

上述查询在 LinqPad 中有效,因为没有要生成的 CTI 事件。使用输入/输出适配器配置时,相同的查询不会吐出完整的结果集。

为了刷新您需要配置的输出,通过输入工厂中的 ITypedDeclareAdvanceTimeProperties 生成 CTI 事件。它负责根据设置生成 CTI 事件。我正在使用以下配置

公共 AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties(StockTickerInputConfig configInfo, EventShape eventShape) { return new AdapterAdvanceTimeSettings(new AdvanceTimeGenerationSettings(configInfo.CtiFrequency, TimeSpan.FromTicks(1)), AdvanceTimePolicy.Adjust); }

在每秒数据频率非常高的 Point 事件的情况下,它会丢弃/不将事件刷新到输出适配器。我必须手动将 LAST CTI 事件排入队列,以确保将结果刷新到输出适配器。

在我的最后一个数据馈送/事件入队后,输入工厂中的当前设置不负责生成最后一个 CTI。我在输入适配器中创建了一个条件来检查这是否是最后一个要入队的事件,然后通过 EnqueueCtiEvent 将 cti 事件入队

如果需要,我可以提供代码。让我知道任何人都有更好的方法来解决这个问题。

于 2012-05-17T05:37:10.533 回答