0

如何定期轮询相对静态的源(如数据库)以在 Microsoft StreamInsight 中创建参考流?

这是我尝试过的。我将用户元数据的数据库表示为一个简单的List<UserMetaData>

var referenceData = new List<UserMetaData>()
    {
        new UserMetaData() { UserId = 1, Name = "Fred Jones", Location = "Seattle" },
        new UserMetaData() { UserId = 2, Name = "Bob Murphy", Location = "Portland" }
    };

这是 UserMetaData 类

public class UserMetaData
{
    public int UserId { get; set; }
    public string Name { get; set; }
    public string Location { get; set; }

    public override string ToString()
    {
        return string.Format(
            "Name: {0}, ID: {1}, Location: {2}",
            this.Name,
            this.UserId,
            this.Location);
    }
}

其余示例代码替换标准 StreamInsight 嵌入式部署设置中的省略号。

using (var server = Server.Create("default"))
{
    var app = server.CreateApplication("app");
    // ...
}

首先,我创建一个这样的心跳:

var heartbeat = app.DefineObservable(
                        () => Observable.Interval(TimeSpan.FromSeconds(2)));

在实际应用程序中,我可能会将此心跳间隔设为五分钟而不是两秒。无论如何,接下来我希望 heatbeat 触发新用户元数据的数据库查找:

var newUserMeta = app.DefineObservable(
                        () => heartbeat.SelectMany(_ => referenceData))
                    .ToPointStreamable(
                        c => PointEvent.CreateInsert(DateTime.Now, c),
                        AdvanceTimeSettings.IncreasingStartTime);

IQbservable.SelectMany 扩展应该使IEnumerable<UserMetaData>我期望的 referenceData 变平。该_参数丢弃从心跳发出的长。然后ToPointStreamable将 转换IObservable<UserMetaData>IQStreamable开始时间为现在的点事件。(DateTime.Now 可能不是很 StreamInsight-y)

然后我将它转换为一个信号,通过一个简单的查询运行它,定义一个控制台接收器并部署它。

// Convert to signal
var metaDataSignal = refStream
                    .AlterEventDuration(e => TimeSpan.MaxValue)
                    .ClipEventDuration(refStream, (e1, e2) => e1.Name == e2.Name);

// Query
var result = from t in metaDataSignal
                 select t;

// Define & deploy sink.
var sink = app.DefineObserver(
                    () => Observer.Create<UserMetaData>(c => Console.WriteLine(c)));
sink.Deploy("sink");

我的最后一步是去Bind水槽。我还将等待几秒钟以查看我的元数据轮询心跳打印到屏幕上的输出,然后将新UserMetaData记录添加到我的数据库并等待查看更改是否得到反映。

using (var process = result.Bind(sink).Run("process"))
{
    Thread.Sleep(4000);

    referenceData.Add(new UserMetaData() 
                            {
                                UserId = 3, 
                                Name = "Voqk", 
                                Location = "Houston" 
                            });

    Console.ReadLine();
}

新的 UserMetaData 记录永远不会反映在输出中

Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland

(... forever)

我假设正在发生的是我的 UserMetaData 列表正在被序列化并在 SI 服务器上重新创建,因此对我的本地副本所做的任何更改都不会反映。我不知道如何克服这一点。

Mark Simms 早在 2010 年就写了一篇关于在 StreamInsight 中使用参考流的博客文章,解释了如何使用静态数据源,并表示他的下一篇文章将描述使用 SQL Server。

不幸的是,那个帖子从未发生过。

编辑:我已经更改了这篇文章中的课程以匹配 Mark Simms 帖子中的课程,并尝试整理并详细说明我的过程。

4

2 回答 2

0

你的假设是正确的。.NET 类不进入 StreamInsight 引擎;您的类仅用于模式(有效负载的形状)。那么......您如何处理更改的参考数据?首先,您的源需要定期刷新。这段时间取决于您期望数据更改的频率。然后,对于参考流,无论数据如何,您都需要使用计时器将 CTI 排入队列(以使其保持移动) - 或者 - 您需要一种从数据流中引入 CTI 的方法。第一种方法最简单,但第二种方法更灵活,因为它将参考流与您在数据流中使用的任何时间戳联系起来,并且可以在重放场景中工作,而不仅仅是实时场景。最后,您需要让您的参考事件过期并在添加新参考数据时被替换。这是使用“To Signal”模式(Alter/Clip)完成的。同样,您可以在这里选择。如果您的参考源足够“智能”以仅将更改排入队列,则可以将参考事件的生命周期更改为 TimeSpan.MaxValue,然后参考数据在取消之前一直有效。但是,如果您只想重新加载所有参考事件,则可以将事件持续时间更改为比刷新率稍长,然后进行剪辑。此方法还允许从流中删除参考事件(在删除等情况下)。参考数据的最终挑战是如何处理时间戳。在大多数示例场景中,数据时间线是基于系统时钟的……情况并非总是如此。而且,即使在这些情况下,你也可以“错过” 由于引用事件的竞争条件,一些启动时的连接仍然在排队,而数据事件已经在泵送。在这种情况下,为参考数据使用一个荒谬的早期开始日期(1970 年 1 月 1 日)和一个荒谬的结束日期(2100 年 1 月 1 日)并将队列作为间隔非常有效。但是,在这种情况下,您绝对需要从数据流中导入 CTI,修改参考事件的开始日期,以免它们违反导入的 CTI 和其他同步任务……您自己。适配器/查询模型很好地处理了这个问题,但反应式模型没有……但是,使用反应式模型,您可以使用主题来真正微调所有这些工作的方式,从而使其变得更加灵活。1970 年)用于参考数据和一个荒谬的晚结束日期(2100 年 1 月 1 日)并作为间隔排队。但是,在这种情况下,您绝对需要从数据流中导入 CTI,修改参考事件的开始日期,以免它们违反导入的 CTI 和其他同步任务……您自己。适配器/查询模型很好地处理了这个问题,但反应式模型没有……但是,使用反应式模型,您可以使用主题来真正微调所有这些工作的方式,从而使其变得更加灵活。1970 年)用于参考数据和一个荒谬的晚结束日期(2100 年 1 月 1 日)并作为间隔排队。但是,在这种情况下,您绝对需要从数据流中导入 CTI,修改参考事件的开始日期,以免它们违反导入的 CTI 和其他同步任务……您自己。适配器/查询模型很好地处理了这个问题,但反应式模型没有……但是,使用反应式模型,您可以使用主题来真正微调所有这些工作的方式,从而使其变得更加灵活。

于 2015-01-11T16:29:38.897 回答
0

作为测试,我var referenceData = List<UserMetaData>()...从 main 中移出并将其声明为静态成员而不是局部变量。

class Program
{
    // *NOW STATIC*
    private static List<UserMetaData> referenceData = new List<UserMetaData>()
    {
        new UserMetaData() {UserId = 1, Name = "Fred Jones", Location = "Seattle"},
        new UserMetaData() {UserId = 2, Name = "Bob Murphy", Location = "Portland"}
    };

    public static void Main(string[] args)
    {
        // ...

现在数据库中的更改反映在输出中......

Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Voqk, ID: 3, Location: Houston
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Voqk, ID: 3, Location: Houston
于 2015-01-15T05:44:59.233 回答