0

在我们的 ActivePivot 解决方案中,我们编写了一个后处理器,它根据股票价格(和波动率参数)计算股票期权的价格。当它被评估时,后处理器(暂时)连接到谷歌金融服务以即时检索股票价格。因此,每次用户在 ActivePivot 上进行查询时,都会使用最新价格实时计算聚合。

但我们也想利用 ActivePivot 中的连续查询,并将更改的聚合实时推送给用户(而不是定期点击 ActivePivot Live 的刷新按钮)。我们知道它通常是通过编写一个连续的处理程序来实现的,它将价格变化事件传播到 ActivePivot 并让 ActivePivot 计算对订阅查询的影响。但谷歌财经不提供推送 API,如果我们通过定期轮询数百只股票来打击这项服务,我们将被禁止。

您建议在 ActivePivot 中使用什么机制来解决此问题?

4

1 回答 1

0

当后处理器的数据源太不可预测时(例如在您的情况下没有推送功能的远程 Web 服务),最好的方法是为此后处理器创建一个和一个处理程序,告诉ActivePivot整个结果这个后置处理器每N秒改变一次。

您可以创建一个 TickingStream,它发送一个具有固定周期的(空)事件,如下所示:

package com.quartetfs.pivot.sandbox.postprocessor.impl;

import java.util.Timer;
import java.util.TimerTask;

import com.quartetfs.biz.pivot.IActivePivotSession;
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousQueryEngine;
import com.quartetfs.biz.pivot.query.aggregates.impl.AStream;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
import com.quartetfs.fwk.types.impl.ExtendedPluginInjector;

/**
 * Stream sending an event at a regular rate.
 *
 */
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IStream", key = TickingStream.PLUGIN_KEY)
public class TickingStream extends AStream<Void> {

    private static final long serialVersionUID = 1L;

    public static final String PLUGIN_KEY = "TICKING";

    /** The default ticking period, in ms. **/
    protected static final long DEFAULT_PERIOD = 1000;

    /** The ticking period, in ms. **/
    protected long period = DEFAULT_PERIOD;

    /** The task responsible for sending the ticking events. */
    protected final TimerTask sendEventTask;

    /** The timer that schedules the {@link #sendEventTask}. */
    protected Timer tickingTimer;

    /**
     * Create a ticking stream.
     *
     * @param engine
     * @param session
     */
    public TickingStream(IAggregatesContinuousQueryEngine engine,
            IActivePivotSession session) {
        super(engine, session);

        // Create the task that will send the events.
        sendEventTask = new TimerTask() {

            @Override
            public void run() {
                sendEvent(null);
            }
        };
        // Schedule this task with the default period:
        rescheduleTask();
    }

    /**
     * Schedule the {@link #sendEventTask} with the {@link #period} period.
     * Removes also all previous scheduling of this task.
     */
    protected void rescheduleTask() {
        if (tickingTimer != null) {
            tickingTimer.cancel();
        }
        tickingTimer = new Timer();
        tickingTimer.schedule(sendEventTask, 0, period);
    }

    /**
     * Change the ticking period of this stream. This will reschedule the task
     * according to this new period. This setter will be called via
     * {@link ExtendedPluginInjector extended plugin injection}
     *
     * @param period the period to set. Must be strictly positive.
     *
     * @throws IllegalArgumentException if period is smaller or equal to 0.
     */
    public void setPeriod(long period) {
        if (period <= 0) {
            throw new IllegalArgumentException("Non-positive period.");
        }
        this.period = period;
        rescheduleTask();
    }

    /** {@inheritDoc} */
    @Override
    public Class<Void> getEventType() {
        return Void.class;
    }

    /** {@inheritDoc} */
    @Override
    public String getType() {
        return PLUGIN_KEY;
    }
}

以及要求刷新与后处理器相关的所有连续查询部分的处理程序在每个滴答时都有此处理程序:

package com.quartetfs.pivot.sandbox.postprocessor.impl;

import java.util.Collections;

import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.query.aggregates.IImpact;
import com.quartetfs.biz.pivot.query.aggregates.impl.AAggregatesContinuousHandler;
import com.quartetfs.biz.pivot.query.aggregates.impl.Impact;
import com.quartetfs.fwk.QuartetExtendedPluginValue;

/**
 * The handler associated with a {@link TickingStream}.
 *
 * This handler asks for a full refresh of the locations queried on
 * post-processors with this handler each time it receives a tick.
 * <p>
 * This is the handler to use for post processors that have unpredictable data
 * sources which prevent the creation of a stream and a handler that can decide
 * which subset of the currently queried locations should be updated in a
 * continuous query.
 *
 */
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler", key = TickingStream.PLUGIN_KEY)
public class TickingHandler extends AAggregatesContinuousHandler<Void> {

    private static final long serialVersionUID = 1L;

    /**
     * @param pivot
     */
    public TickingHandler(IActivePivot pivot) {
        super(pivot);
    }

    /**
     * {@inheritDoc}
     * <p>
     * The impact on a queried location is the whole location since there is no
     * way for us to know which part of the location should be updated or not.
     */
    @Override
    public IImpact computeImpact(ILocation location, Void event) {
        return new Impact(location, Collections.singleton(location), Collections.singleton(location));
    }

    /** {@inheritDoc} */
    @Override
    public String getStreamKey() {
        // This handler is made to be used with the TickingStream.
        return TickingStream.PLUGIN_KEY;
    }

    /** {@inheritDoc} */
    @Override
    public String getType() {
        return TickingStream.PLUGIN_KEY;
    }
}

并且您将后处理器配置为以这种方式使用此处理程序:

<measure name="..." folder="..." aggregationFunctions="...">
            <postProcessor pluginKey="yourPPpluginKey">
                <properties>
                    <entry key="continuousQueryHandlerKeys" value="TICKING" />
                </properties>
            </postProcessor>
        </measure>

在某种程度上,您将轮询 Google 金融服务,因为您的后处理器将每秒调用一次(默认周期)。但这只有在用户使用您的后处理器进行连续查询时才会发生,并且您的后处理器只会在用户查询的位置上被调用,因此希望您只需要一小部分 Google 财经信息。此外,对您的后处理器的调用将在多个用户之间共享,以便您向 Google 财经发出最少数量的查询。

于 2013-02-07T10:41:10.240 回答