我目前正在使用 cassandra 开发 POC。
我想要做的:有可变数量的传感器(预先不知道),每个传感器每秒都会提供一些值。我想做的是计算每秒、分钟、小时等的平均值、最小值、最大值、速度。
我如何建模我的数据:对于有多个列族;raw、avg-5-second、avg-60-second 等。rowid 是传感器 id,例如 machinex:memory。列名是时间戳,列值是度量。
到目前为止我所拥有的:我创建了一个系统,在该系统中为单个传感器(所以单个 rowid)生成数据。我有一些任务为给定的 rowid 获取一片数据并将结果存储在聚合的 columnfamilies 中。
例子:
Cluster cluster = HFactory.getOrCreateCluster("test-cluster", "localhost:9160"); 键空间 keyspace = createKeyspace(cluster, "Measurements");
String machine1 = "foo:dev:192.168.1.1:5701";
String rowId = machine1 + ":operationCount";
DatapointRepository rawRepo = new DatapointRepository(cluster, keyspace, "Measurements");
DatapointRepository avgSecondRepo = new DatapointRepository(cluster, keyspace, "averageSecond");
DatapointRepository avgFiveSecondRepo = new DatapointRepository(cluster, keyspace, "averageFiveSeconds");
DatapointRepository maxFiveSecondRepo = new DatapointRepository(cluster, keyspace, "maxFiveSeconds");
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(10);
scheduler.scheduleAtFixedRate(
new RollupRunnable(
rawRepo,
avgSecondRepo,
rowId,
"average 1 second",
new AggregateFunctionFactory(AverageFunction.class)),
0, 1, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(
new RollupRunnable(
avgSecondRepo,
avgFiveSecondRepo,
rowId,
"average 5 seconds",
new AggregateFunctionFactory(AverageFunction.class)),
0, 5, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(
new RollupRunnable(
avgSecondRepo,
maxFiveSecondRepo,
rowId,
"maximum 5 seconds",
new AggregateFunctionFactory(MaximumFunction.class)),
0, 5, TimeUnit.SECONDS);
long startTime = System.currentTimeMillis();
new GenerateMeasurementsThread(rawRepo, machine1).start();
Thread.sleep(30000);
long endTime = System.currentTimeMillis();
System.out.println("average seconds:");
print(avgSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("average 5 seconds:");
print(avgFiveSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("max 5 seconds:");
print(maxFiveSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("finished");
System.exit(0);
因此,如果我有一个传感器(所以只有一个行 id),或者如果我预先知道有哪些传感器,那么一切正常。问题是我的传感器数量不定,新传感器可能会在任何给定时刻出现,而旧传感器可能会停止发送数据。
我的大问题是:如何确定在给定时间段内哪些传感器可用?一旦我知道了这一点,我就可以为每个传感器创建一个聚合任务。