0

我目前正在使用 cassandra 开发 POC。

我想要做的:有可变数量的传感器(预先不知道),每个传感器每秒都会提供一些值。我想做的是计算每秒、分钟、小时等的平均值、最小值、最大值、速度。

我如何建模我的数据:对于有多个列族;raw、avg-5-second、avg-60-second 等。rowid 是传感器 id,例如 machinex:memory。列名是时间戳,列值是度量。

到目前为止我所拥有的:我创建了一个系统,在该系统中为单个传感器(所以单个 rowid)生成数据。我有一些任务为给定的 rowid 获取一片数据并将结果存储在聚合的 columnfamilies 中。

例子:

Cluster cluster = HFactory.getOrCreateCluster("test-cluster", "localhost:9160"); 键空间 keyspace = createKeyspace(cluster, "Measurements");

String machine1 = "foo:dev:192.168.1.1:5701";
String rowId = machine1 + ":operationCount";

DatapointRepository rawRepo = new DatapointRepository(cluster, keyspace, "Measurements");
DatapointRepository avgSecondRepo = new DatapointRepository(cluster, keyspace, "averageSecond");
DatapointRepository avgFiveSecondRepo = new DatapointRepository(cluster, keyspace, "averageFiveSeconds");
DatapointRepository maxFiveSecondRepo = new DatapointRepository(cluster, keyspace, "maxFiveSeconds");

ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(10);
scheduler.scheduleAtFixedRate(
        new RollupRunnable(
        rawRepo,
        avgSecondRepo,
        rowId,
        "average 1 second",
                new AggregateFunctionFactory(AverageFunction.class)),
        0, 1, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(
        new RollupRunnable(
        avgSecondRepo,
        avgFiveSecondRepo,
        rowId,
        "average 5 seconds",
                new AggregateFunctionFactory(AverageFunction.class)),
        0, 5, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(
        new RollupRunnable(
        avgSecondRepo,
        maxFiveSecondRepo,
        rowId,
        "maximum 5 seconds",
                new AggregateFunctionFactory(MaximumFunction.class)),
        0, 5, TimeUnit.SECONDS);


long startTime = System.currentTimeMillis();

new GenerateMeasurementsThread(rawRepo, machine1).start();

Thread.sleep(30000);

long endTime = System.currentTimeMillis();

System.out.println("average seconds:");
print(avgSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("average 5 seconds:");
print(avgFiveSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("max 5 seconds:");
print(maxFiveSecondRepo, startTime, endTime, machine1 + ":operationCount");


System.out.println("finished");
System.exit(0);

因此,如果我有一个传感器(所以只有一个行 id),或者如果我预先知道有哪些传感器,那么一切正常。问题是我的传感器数量不定,新传感器可能会在任何给定时刻出现,而旧传感器可能会停止发送数据。

我的大问题是:如何确定在给定时间段内哪些传感器可用?一旦我知道了这一点,我就可以为每个传感器创建一个聚合任务。

4

2 回答 2

0

“我最大的问题是:我如何才能确定在给定时间段内哪些传感器可用?一旦我知道了,我就可以为每个传感器创建一个聚合任务。”

到目前为止,您所做的是按传感器 THEN 时间戳(sensorId=rowId,timestamp = column name)索引数据

您现在要做的是首先按时间索引。恐怕您需要创建额外的列族:

rowId = xxx // whatever value, doest not really matter

column name = timestamp

column value = sensor ID
于 2013-09-07T09:47:57.067 回答
0

@userxxxx

“我已经实施了您的建议并且它有效,除了一个错误。如果在同一“时间”有多个传感器数据点,则只显示最后保存的数据点的名称。

轻松修复:

rowId = xxx // whatever value, doest not really matter

column name = composite of(timestamp,sensorId)

column value = nothing

通过将列名设置为时间戳和 sensorId 的组合,您将涵盖同时拥有多个传感器的情况。

由于 sensorID 信息直接存储在列中,因此您不再需要列值。这称为无列族

创建此类表的 CQL 脚本

CREATE TABLE sensor_index_by_date
(

   row_id text, // whatever
   date timestamp,
   sensor_id bigint,
   PRIMARY KEY(rowId,date,sensor_id)
);
于 2013-09-07T16:32:44.650 回答