0

我想做 Flink CEP 引擎的性能分析,我遇到了这些类

org.apache.flink.optimizer.costs.CostEstimator; 
org.apache.flink.optimizer.costs.Costs; 
org.apache.flink.optimizer.costs.DefaultCostEstimator;

但问题是我不知道如何使用这两个类。有人可以向我提供代码或暗示,我如何在 Flink 中找到运营商 { 例如加入} 的成本估算。

下面是我在 Flink 中执行的连接代码

DataStream<JoinedEvent> joinedEventDataStream = stream1.join(stream2).where(new KeySelector<RRIntervalStreamEvent, Long>() {
        @Override
        public Long getKey(RRIntervalStreamEvent rrIntervalStreamEvent) throws Exception {
            return rrIntervalStreamEvent.getTime();
        }
    })
            .equalTo(new KeySelector<qrsIntervalStreamEvent, Long>() {
        @Override
        public Long getKey(qrsIntervalStreamEvent qrsIntervalStreamEvent) throws Exception {
            return qrsIntervalStreamEvent.getTime();
        }
    })
            .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
            .apply(new JoinFunction<RRIntervalStreamEvent, qrsIntervalStreamEvent, JoinedEvent>() {
                @Override
                public JoinedEvent join(RRIntervalStreamEvent rr, qrsIntervalStreamEvent qrs) throws Exception {

                    //getting the cost -- just checking

                   // costs.getCpuCost();

                    return new JoinedEvent(rr.getTime(),rr.getSensor_id(),qrs.getSensor_id(),rr.getRRInterval(),qrs.getQrsInterval());
                }
            });

如何计算此连接的成本?

4

1 回答 1

1

成本类属于 DataSet API(Flink 的批处理 API)的优化器,而 CEP 库是基于 DataStream API 构建的。DataStream API 不利用 DataSet API。

CEP 库和 DataSet 优化器完全不相关。因此,不可能使用此代码来估计 CEP 模式的成本。我也不知道另一种内置方法来估计 CEP 模式(或任何其他 DataStream 程序)的成本。

于 2017-10-23T07:44:38.837 回答