当后处理器的数据源太不可预测时(例如在您的情况下没有推送功能的远程 Web 服务),最好的方法是为此后处理器创建一个流和一个处理程序,告诉ActivePivot整个结果这个后置处理器每N秒改变一次。
您可以创建一个 TickingStream,它发送一个具有固定周期的(空)事件,如下所示:
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import java.util.Timer;
import java.util.TimerTask;
import com.quartetfs.biz.pivot.IActivePivotSession;
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousQueryEngine;
import com.quartetfs.biz.pivot.query.aggregates.impl.AStream;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
import com.quartetfs.fwk.types.impl.ExtendedPluginInjector;
/**
* Stream sending an event at a regular rate.
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IStream", key = TickingStream.PLUGIN_KEY)
public class TickingStream extends AStream<Void> {
private static final long serialVersionUID = 1L;
public static final String PLUGIN_KEY = "TICKING";
/** The default ticking period, in ms. **/
protected static final long DEFAULT_PERIOD = 1000;
/** The ticking period, in ms. **/
protected long period = DEFAULT_PERIOD;
/** The task responsible for sending the ticking events. */
protected final TimerTask sendEventTask;
/** The timer that schedules the {@link #sendEventTask}. */
protected Timer tickingTimer;
/**
* Create a ticking stream.
*
* @param engine
* @param session
*/
public TickingStream(IAggregatesContinuousQueryEngine engine,
IActivePivotSession session) {
super(engine, session);
// Create the task that will send the events.
sendEventTask = new TimerTask() {
@Override
public void run() {
sendEvent(null);
}
};
// Schedule this task with the default period:
rescheduleTask();
}
/**
* Schedule the {@link #sendEventTask} with the {@link #period} period.
* Removes also all previous scheduling of this task.
*/
protected void rescheduleTask() {
if (tickingTimer != null) {
tickingTimer.cancel();
}
tickingTimer = new Timer();
tickingTimer.schedule(sendEventTask, 0, period);
}
/**
* Change the ticking period of this stream. This will reschedule the task
* according to this new period. This setter will be called via
* {@link ExtendedPluginInjector extended plugin injection}
*
* @param period the period to set. Must be strictly positive.
*
* @throws IllegalArgumentException if period is smaller or equal to 0.
*/
public void setPeriod(long period) {
if (period <= 0) {
throw new IllegalArgumentException("Non-positive period.");
}
this.period = period;
rescheduleTask();
}
/** {@inheritDoc} */
@Override
public Class<Void> getEventType() {
return Void.class;
}
/** {@inheritDoc} */
@Override
public String getType() {
return PLUGIN_KEY;
}
}
以及要求刷新与后处理器相关的所有连续查询部分的处理程序在每个滴答时都有此处理程序:
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import java.util.Collections;
import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.query.aggregates.IImpact;
import com.quartetfs.biz.pivot.query.aggregates.impl.AAggregatesContinuousHandler;
import com.quartetfs.biz.pivot.query.aggregates.impl.Impact;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
/**
* The handler associated with a {@link TickingStream}.
*
* This handler asks for a full refresh of the locations queried on
* post-processors with this handler each time it receives a tick.
* <p>
* This is the handler to use for post processors that have unpredictable data
* sources which prevent the creation of a stream and a handler that can decide
* which subset of the currently queried locations should be updated in a
* continuous query.
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler", key = TickingStream.PLUGIN_KEY)
public class TickingHandler extends AAggregatesContinuousHandler<Void> {
private static final long serialVersionUID = 1L;
/**
* @param pivot
*/
public TickingHandler(IActivePivot pivot) {
super(pivot);
}
/**
* {@inheritDoc}
* <p>
* The impact on a queried location is the whole location since there is no
* way for us to know which part of the location should be updated or not.
*/
@Override
public IImpact computeImpact(ILocation location, Void event) {
return new Impact(location, Collections.singleton(location), Collections.singleton(location));
}
/** {@inheritDoc} */
@Override
public String getStreamKey() {
// This handler is made to be used with the TickingStream.
return TickingStream.PLUGIN_KEY;
}
/** {@inheritDoc} */
@Override
public String getType() {
return TickingStream.PLUGIN_KEY;
}
}
并且您将后处理器配置为以这种方式使用此处理程序:
<measure name="..." folder="..." aggregationFunctions="...">
<postProcessor pluginKey="yourPPpluginKey">
<properties>
<entry key="continuousQueryHandlerKeys" value="TICKING" />
</properties>
</postProcessor>
</measure>
在某种程度上,您将轮询 Google 金融服务,因为您的后处理器将每秒调用一次(默认周期)。但这只有在用户使用您的后处理器进行连续查询时才会发生,并且您的后处理器只会在用户查询的位置上被调用,因此希望您只需要一小部分 Google 财经信息。此外,对您的后处理器的调用将在多个用户之间共享,以便您向 Google 财经发出最少数量的查询。