4

我有一个系统中发生的事件列表。我的目标是获取事件列表并创建系列的滑动窗口以确定事件发生率。事件从问题范围之外的应用程序加载到事件列表中。

因为系统可以同时接收来自多个源的事件,所以一些事件发生的时间戳(我用作系列键的值)是相同的。实现这一目标的正确方法是什么?

这是我得到的错误:

An unhandled exception of type 'System.ArgumentException' occurred in Deedle.dll

Additional information: Duplicate key '6/12/2015 3:14:43 AM'. Duplicate keys are not allowed in the index.

我的代码:

let mutable events = new ResizeArray<StreamEvent>()
let getSeries =
    let eventsKvp = events |>  Seq.map(fun(event) -> new KeyValuePair<DateTime,StreamEvent>(event.OccuredAt,event))
        let series = Series(eventsKvp)
    series |> Series.windowDist (TimeSpan(0, 0, 0,30))

更新#1

这里没有描述的是一些 C# 代码,它实例化了一些 F# Stream 对象并通过 Stream.ProcessEvent 方法添加事件。该代码对我在这里遇到的问题并不重要。

我不再遇到重复密钥问题,但出现Additional information: Floating window aggregation and chunking is only supported on ordered indices.错误。

更新 #2 我需要使用 sortByKey 而不是排序。

这是我的 F# 代码:

namespace Storck.Data
open System
open System.Collections.Generic
open Deedle

type EventType =
    | ClientConnected
    | ClientDisconnect

type Edge(id:string,streamId:string) = 
    member this.Id = id
    member this.StreamId = streamId
    member this.Edges =  new ResizeArray<Edge>() 

type StreamEvent(id:string,originStreamId:string,eventType:EventType,ocurredAt:DateTime) = 
    member this.Id = id
    member this.Origin = originStreamId
    member this.EventType = eventType
    member this.OccuredAt = ocurredAt
    override this.Equals(o) =
        match o with
        | :? StreamEvent as sc -> this.Id = sc.Id
        | _ -> false
    override this.GetHashCode() =
        id.GetHashCode()
    interface System.IComparable with
        member this.CompareTo(o) =
            match o with
            | :? StreamEvent as sc -> compare this.Id sc.Id
            | _ -> -1

type Client(id:string) = 
    member this.Id=id
type Key = 
  | Key of DateTime * string
  static member (-) (Key(a, _), Key(b, _)) = a - b
  override x.ToString() = let (Key(d, s)) = x in d.ToString() + ", " + s

  type Stream(id:string, origin:string) = 
    let mutable clients = new   ResizeArray<Client>()
    let mutable events = new ResizeArray<StreamEvent>()

    member this.Events =  clients.AsReadOnly()
    member this.Clients = clients.AsReadOnly()
    member this.Id = id
    member this.Origin = origin
    member this.Edges =  new ResizeArray<Edge>() 
    member this.ProcessEvent(client:Client,event:StreamEvent)  =  
        match event.EventType with
            |EventType.ClientConnected -> 
                events.Add(event)
                clients.Add(client)
                true
            |EventType.ClientDisconnect -> 
                events.Add(event)
                let clientToRemove = clients |> Seq.find(fun(f)-> f.Id = client.Id)
                clients.Remove(clientToRemove)
    member this.GetSeries() =       
        let ts = series [ for e in events -> Key(e.OccuredAt, e.Id) => e ]
        ts |> Series.sortByKey |> Series.windowDist (TimeSpan(0, 0, 0,30))
4

1 回答 1

5

我们在 Deedle 中做出的设计决策之一是可以将序列视为连续序列(而不是事件序列),因此 Deedle 不允许重复键(这对事件有意义,但对时间序列没有意义)。

我希望对您的场景之类的事情有更好的支持——这是我们正在为下一个版本考虑的事情,但我不确定如何最好地做到这一点。

正如 Fyodor 在评论中建议的那样,您可以使用由日期和某些内容(源或只是序数索引)组成的唯一索引。

如果你-在你的键上定义了操作符,那么你甚至可以使用这个windowDist函数:

type StreamEvent = { OccuredAt : DateTime; Source : string; Value : int }

/// A key combines date with the source and defines the 
/// (-) operator which subtracts the dates returning TimeSpan
type Key = 
  | Key of DateTime * string
  static member (-) (Key(a, _), Key(b, _)) = a - b
  override x.ToString() = let (Key(d, s)) = x in d.ToString() + ", " + s

现在我们可以创建一堆示例事件:

let events = 
  [ { OccuredAt = DateTime(2015,1,1,12,0,0); Source = "one"; Value = 1 }
    { OccuredAt = DateTime(2015,1,1,12,0,0); Source = "two"; Value = 2 }
    { OccuredAt = DateTime(2015,1,1,13,0,0); Source = "one"; Value = 3 } ]

series在这里,我将使用Deedle 运算符的内置函数=>来创建将键映射到值的系列:

let ts = series [ for e in events -> Key(e.OccuredAt, e.Source) => e ]

我们甚至可以使用该windowDist功能,因为键类型支持-

ts |> Series.windowDist (TimeSpan(0, 0, 0,30))
于 2015-06-12T04:13:12.463 回答