如何定期轮询相对静态的源(如数据库)以在 Microsoft StreamInsight 中创建参考流?
这是我尝试过的。我将用户元数据的数据库表示为一个简单的List<UserMetaData>
var referenceData = new List<UserMetaData>()
{
new UserMetaData() { UserId = 1, Name = "Fred Jones", Location = "Seattle" },
new UserMetaData() { UserId = 2, Name = "Bob Murphy", Location = "Portland" }
};
这是 UserMetaData 类
public class UserMetaData
{
public int UserId { get; set; }
public string Name { get; set; }
public string Location { get; set; }
public override string ToString()
{
return string.Format(
"Name: {0}, ID: {1}, Location: {2}",
this.Name,
this.UserId,
this.Location);
}
}
其余示例代码替换标准 StreamInsight 嵌入式部署设置中的省略号。
using (var server = Server.Create("default"))
{
var app = server.CreateApplication("app");
// ...
}
首先,我创建一个这样的心跳:
var heartbeat = app.DefineObservable(
() => Observable.Interval(TimeSpan.FromSeconds(2)));
在实际应用程序中,我可能会将此心跳间隔设为五分钟而不是两秒。无论如何,接下来我希望 heatbeat 触发新用户元数据的数据库查找:
var newUserMeta = app.DefineObservable(
() => heartbeat.SelectMany(_ => referenceData))
.ToPointStreamable(
c => PointEvent.CreateInsert(DateTime.Now, c),
AdvanceTimeSettings.IncreasingStartTime);
IQbservable.SelectMany 扩展应该使IEnumerable<UserMetaData>
我期望的 referenceData 变平。该_
参数丢弃从心跳发出的长。然后ToPointStreamable
将 转换IObservable<UserMetaData>
为IQStreamable
开始时间为现在的点事件。(DateTime.Now 可能不是很 StreamInsight-y)
然后我将它转换为一个信号,通过一个简单的查询运行它,定义一个控制台接收器并部署它。
// Convert to signal
var metaDataSignal = refStream
.AlterEventDuration(e => TimeSpan.MaxValue)
.ClipEventDuration(refStream, (e1, e2) => e1.Name == e2.Name);
// Query
var result = from t in metaDataSignal
select t;
// Define & deploy sink.
var sink = app.DefineObserver(
() => Observer.Create<UserMetaData>(c => Console.WriteLine(c)));
sink.Deploy("sink");
我的最后一步是去Bind
水槽。我还将等待几秒钟以查看我的元数据轮询心跳打印到屏幕上的输出,然后将新UserMetaData
记录添加到我的数据库并等待查看更改是否得到反映。
using (var process = result.Bind(sink).Run("process"))
{
Thread.Sleep(4000);
referenceData.Add(new UserMetaData()
{
UserId = 3,
Name = "Voqk",
Location = "Houston"
});
Console.ReadLine();
}
新的 UserMetaData 记录永远不会反映在输出中
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
(... forever)
我假设正在发生的是我的 UserMetaData 列表正在被序列化并在 SI 服务器上重新创建,因此对我的本地副本所做的任何更改都不会反映。我不知道如何克服这一点。
Mark Simms 早在 2010 年就写了一篇关于在 StreamInsight 中使用参考流的博客文章,解释了如何使用静态数据源,并表示他的下一篇文章将描述使用 SQL Server。
不幸的是,那个帖子从未发生过。
编辑:我已经更改了这篇文章中的课程以匹配 Mark Simms 帖子中的课程,并尝试整理并详细说明我的过程。