0

我想在 esper 中记录数据处理时间,我选择布林带作为示例。在布林带中,称为移动平均线 (MA)。该 MA 从计算股票价格平均值的结果中获得。在这种情况下,我设置了 win:length(20)。因此,可以从数据窗口视图中存在的 20 个事件计算股票价格平均值的结果中获得 MA。以下是我创建的代码。

public class BollingerBand {
    static double startTime, finishTime;

    public static void main (String [] args){
        Configuration configuration = new Configuration();
        configuration.addEventType("Stock", Stock.class);

        EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(configuration);
        AdapterInputSource source = new AdapterInputSource("BollingerBand.csv");

        EPStatement statement = epService.getEPAdministrator().createEPL("insert into Aggregation " +
                "select prevcount(symbol), symbol, avg(price) as SimpleMovingAverage, stddev(price) as StandardDeviation, " +
                "last(price) as price, last(timestamp) as date from Stock.std:groupwin(symbol).win:length(20)" +
                " group by symbol having count(*) >=20");

        statement.addListener(new UpdateListener() {

            public void update(EventBean[] newEvents, EventBean[] oldEvents) {
                // TODO Auto-generated method stub
                //System.out.println("Event Receive : "+newEvents[0].getUnderlying());
                startTime = System.currentTimeMillis();
                System.out.println("\nStart time : " + startTime + " miliseconds\n");
            }
        });

        EPStatement statement2 = epService.getEPAdministrator().createEPL("select symbol, " +
                 "SimpleMovingAverage + 2*StandardDeviation as UpperBand," +
                 "SimpleMovingAverage as MiddleBand," +
                 "SimpleMovingAverage - 2*StandardDeviation as LowerBand," +
                 "price," +
                 "4*StandardDeviation/SimpleMovingAverage as Bandwidth," +
                 "(price - (SimpleMovingAverage - (2 * StandardDeviation))) / ((SimpleMovingAverage + " +
                 "(2 * StandardDeviation)) - (SimpleMovingAverage - (2 * StandardDeviation))) as PercentB," +
                 "date from Aggregation");

        statement2.addListener(new UpdateListener() {

            public void update(EventBean[] newEvents, EventBean[] oldEvents) {
                // TODO Auto-generated method stub
                //System.out.println("Event Receive : "+newEvents[0].getUnderlying());
                finishTime = System.currentTimeMillis();
                System.out.println("Start time : " + startTime + " miliseconds");
                System.out.println("Finish time : " + finishTime + " miliseconds");
                System.out.println("Processing time : " + (finishTime-startTime) + " miliseconds");
            }
        });

        (new CSVInputAdapter(epService, source, "Stock")).start();
    }

}

从上面的代码中,如果计算了平均值,将记录时间。但我需要的是我希望记录第 20 个事件和下一个事件进入数据窗口视图时的时间。它是从布林带计算结果中获得的开始时间和结束时间。我的问题是如何记录第 20 个事件的时间,并同时将下一个事件输入到窗口视图数据中。请帮忙

4

2 回答 2

0

发送事件时,CSV 适配器不提供回调。但是,您可以轻松更改其代码。或者您可以使用不同的 CSV 阅读器并通过运行时 API 发送事件。

于 2013-06-10T12:14:39.083 回答
0

也许有某种 TickCounter,其中有一个带有键值对(item_count 和时间戳)的映射。您在第二个 UpdateListener 中更新它,当然您可以随时查找键 20 的项目。

顺便说一句,我使用了您的布林带计算,但使用了 Storm 和 EsperBolt。在这里写博客:http: //chanchal.wordpress.com/2014/07/08/using-esperbolt-and-storm-to-calculate-bollinger-bands/

于 2014-07-08T13:06:35.407 回答