我有 StreamInsight 服务器
static void Main(string[] args)
// Create an embedded StreamInsight server
using (var server = Server.Create("Default"))
// Create a local end point for the server embedded in this program
var host = new ServiceHost(server.CreateManagementService());
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "http://localhost/MyStreamInsightServer");
/* The following entities will be defined and available in the server for other clients:
* serverApp
* serverSource
* serverSink
* serverProcess
// CREATE a StreamInsight APPLICATION in the server
var myApp = server.CreateApplication("serverApp");
// DEFINE a simple SOURCE (returns a point event every second)
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
// DEPLOY the source to the server for clients to use
// Compose a QUERY over the source (return every even-numbered event)
var myQuery = from e in mySource
where e % 2 == 0
select e;
// DEFINE a simple observer SINK (writes the value to the server console)
var mySink = myApp.DefineObserver(() => Observer.Create<long>(x => Console.WriteLine("sink_Server..: {0}", x)));
// DEPLOY the sink to the server for clients to use
// BIND the query to the sink and RUN it
using (var proc = myQuery.Bind(mySink).Run("serverProcess"))
// Wait for the user stops the server
Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");
Console.WriteLine(" ");
StreamInsight 客户端 A:
static void Main(string[] args)
// Connect to the StreamInsight server
using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"http://localhost/MyStreamInsightServer")))
/* The following entities are expected to be defined in the server:
* serverApp0
* serverSource0
* serverSink0
/* The following entity will be defined in the server by this client:
* serverProcess_Client_A
// Get the existing StreamInsight APPLICATION
var myApp = server.Applications["serverApp"];
// GET the SOURCE from the server
var mySource = myApp.GetStreamable<long>("serverSource");
// Compose a QUERY on the source (return every even-numbered item + 1000)
var myQuery = from e in mySource
where e % 2 == 0
select e + 1000;
// GET the SINK from the server
var mySink = myApp.GetObserver<long>("serverSink");
// BIND the QUERY to the SINK and RUN it
using (var proc = myQuery.Bind(mySink).Run("serverProcess_Client_A"))
// Wait for the user to stop the program
Console.WriteLine("Client A is running, press Enter to exit the client");
Console.WriteLine(" ");
我有另一个客户端 B,我想将客户端 B 的查询绑定到客户端 A 的此进程(该进程的名称为"serverProcess_Client_A")。我不想从客户端 B 创建新进程
另一个问题:我可以将查询绑定到现有的 StreamInsight 进程吗?