我考虑使用 ActivePivot 实例来计算 CVA(信用估值调整)。
我必须在大量单元格(每个交易对手 20k)上应用一段逻辑,每个单元格都与大小为 10k 的浮点数组相关联。即使 ActivePivot 是大规模多线程的,ABasicPostProcessor 也会以单线程方式应用于每个范围位置。我怎样才能让它以多线程方式通过我的点位置进行计算?
我考虑使用 ActivePivot 实例来计算 CVA(信用估值调整)。
我必须在大量单元格(每个交易对手 20k)上应用一段逻辑,每个单元格都与大小为 10k 的浮点数组相关联。即使 ActivePivot 是大规模多线程的,ABasicPostProcessor 也会以单线程方式应用于每个范围位置。我怎样才能让它以多线程方式通过我的点位置进行计算?
我构建了以下类,它专门通过以多线程方式添加对 doEvaluation 的调用来专门化 ABasicPostProcessor(一个能够快速实现每点后处理器的核心类)。
给定 ABasicPostProcessor 专业化,只需扩展 AParallelBasicPostProcessor 以获得并行评估!
/**
* Specialization of ABasicPostProcessor which will call doEvaluation in a
* multithreaded way
*
* @author BLA
*/
public abstract class AParallelBasicPostProcessor<OutputType> extends ABasicPostProcessor<OutputType> {
private static final long serialVersionUID = -3453966549173516186L;
public AParallelBasicPostProcessor(String name, IActivePivot pivot) {
super(name, pivot);
}
@Override
public void evaluate(ILocation location, final IAggregatesRetriever retriever) throws QuartetException {
// Retrieve required aggregates
final ICellSet cellSet = retriever.retrieveAggregates(Collections.singleton(location), Arrays.asList(prefetchMeasures));
// Prepare a List
List<ALocatedRecursiveTask<OutputType>> tasks = new ArrayList<ALocatedRecursiveTask<OutputType>>();
// Create the procedure to hold the parallel sub-tasks
final ICellsProcedure subTasksGeneration = makeSubTasksGenerationProcedure(tasks);
cellSet.forEachLocation(subTasksGeneration, underlyingMeasures);
ForkJoinTask.invokeAll(tasks);
for (ALocatedRecursiveTask<OutputType> task : tasks) {
OutputType returnValue;
try {
returnValue = task.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
// re-throw the root cause of the ExecutionException
throw new RuntimeException(e.getCause());
}
// We can write only non-null aggregates
if (null != returnValue) {
writeInRetriever(retriever, task.getLocation(), returnValue);
}
}
}
protected void writeInRetriever(IAggregatesRetriever retriever, ILocation location, OutputType returnValue) {
retriever.write(location, returnValue);
}
protected ICellsProcedure makeSubTasksGenerationProcedure(List<ALocatedRecursiveTask<OutputType>> futures) {
return new SubTasksGenerationProcedure(futures);
}
/**
* {@link ICellsProcedure} registering a {@link ALocatedRecursiveTask} per
* point location
*/
protected class SubTasksGenerationProcedure implements ICellsProcedure {
protected List<ALocatedRecursiveTask<OutputType>> futures;
public SubTasksGenerationProcedure(List<ALocatedRecursiveTask<OutputType>> futures) {
this.futures = futures;
}
@Override
public boolean execute(final ILocation pointLocation, int rowId, Object[] measures) {
// clone the array of measures as it is internally used as a buffer
final Object[] clone = measures.clone();
futures.add(makeLocatedFuture(pointLocation, clone));
return true;
}
}
protected ALocatedRecursiveTask<OutputType> makeLocatedFuture(ILocation pointLocation, Object[] measures) {
return new LocatedRecursiveTask(pointLocation, measures);
}
/**
* A specialization of RecursiveTask by associating it to a
* {@link ILocation}
*
* @author BLA
*
*/
protected static abstract class ALocatedRecursiveTask<T> extends RecursiveTask<T> {
private static final long serialVersionUID = -6014943980790547011L;
public abstract ILocation getLocation();
}
/**
* Default implementation of {@link ALocatedRecursiveTask}
*
* @author BLA
*
*/
protected class LocatedRecursiveTask extends ALocatedRecursiveTask<OutputType> {
private static final long serialVersionUID = 676859831679236794L;
protected ILocation pointLocation;
protected Object[] measures;
public LocatedRecursiveTask(ILocation pointLocation, Object[] measures) {
this.pointLocation = pointLocation;
this.measures = measures;
if (pointLocation.isRange()) {
throw new RuntimeException(this.getClass() + " accepts only point location: " + pointLocation);
}
}
@Override
protected OutputType compute() {
try {
// The custom evaluation will be computed in parallel
return AParallelBasicPostProcessor.this.doEvaluation(pointLocation, measures);
} catch (QuartetException e) {
throw new RuntimeException(e);
}
}
@Override
public ILocation getLocation() {
return pointLocation;
}
}
}
ActivePivot 查询引擎是多线程的,在单个查询中调用多个后处理器是并行完成的(当然,除非一个依赖于另一个的结果)。当同一个后处理器在查询中涉及的位置多次执行时,这也是并行完成的。因此,在卷起袖子之前,有必要检查一下您的查询计划中是否没有更明显的瓶颈。
现在,在一个位置调用一个后处理器确实是 ActivePivot 查询引擎中不可分割的工作负载。如果聚合不仅仅是以纳秒为单位的数字,而是像向量这样的大型或结构化对象,则可能存在并行驱动性能提升的空间。
ActivePivot 查询引擎构建在 fork/join 池 ( http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html ) 之上。这意味着您的后处理器代码总是从分叉连接池中调用,这使得您可以分叉您自己的子任务,然后加入它们。这被认为是一个专家技巧,如果没有对分叉连接池的工作原理有一个公平的了解,请不要尝试。
让我们考虑一个后处理器,它为每个评估的位置计算几个度量的最大值:
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.postprocessing.impl.ABasicPostProcessor;
import com.quartetfs.fwk.QuartetException;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
/**
*
* Post processor that computes the MAX of several measures.
*
* @author Quartet FS
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.postprocessing.IPostProcessor", key = MaxPostProcessor.TYPE)
public class MaxPostProcessor extends ABasicPostProcessor<Double> {
/** serialVersionUID */
private static final long serialVersionUID = -8886545079342151420L;
/** Plugin type */
public static final String TYPE = "MAX";
public MaxPostProcessor(String name, IActivePivot pivot) {
super(name, pivot);
}
@Override
public String getType() { return TYPE; }
@Override
protected Double doEvaluation(ILocation location, Object[] measures) throws QuartetException {
double max = ((Number) measures[0]).doubleValue();
for(int i = 1; i < measures.length; i++) {
max = Math.max(max, ((Number) measures[i]).doubleValue());
}
return max;
}
}
在该后处理器中,由评估的范围位置产生的叶位置将一个接一个地计算。您可以决定改为创建任务,并通过分叉连接池并行执行这些任务。我希望以下内容可以帮助您入门:
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveTask;
import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.cellset.ICellSet;
import com.quartetfs.biz.pivot.cellset.ICellsProcedure;
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesRetriever;
import com.quartetfs.fwk.QuartetException;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
/**
*
* Post processor that computes the MAX of several measures,
* evaluation of locations is performed in parallel.
*
* @author Quartet FS
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.postprocessing.IPostProcessor", key = ParallelMaxPostProcessor.TYPE)
public class ParallelMaxPostProcessor extends MaxPostProcessor {
/** serialVersionUID */
private static final long serialVersionUID = -8886545079342151420L;
/** Plugin type */
public static final String TYPE = "PMAX";
public ParallelMaxPostProcessor(String name, IActivePivot pivot) {
super(name, pivot);
}
@Override
public String getType() { return TYPE; }
@Override
public void evaluate(ILocation location, IAggregatesRetriever retriever)throws QuartetException {
try {
// Retrieve required aggregates
ICellSet cellSet = retriever.retrieveAggregates(Collections.singleton(location), Arrays.asList(prefetchMeasures));
// Evaluate the cell set to create tasks
ParallelEvaluationProcedure evalProcedure = new ParallelEvaluationProcedure();
cellSet.forEachLocation(evalProcedure);
// Execute the tasks in parallel and write results
evalProcedure.writeResults(retriever);
} catch(Exception e) {
throw new QuartetException("Evaluation of " + this + " on location " + location + " failed.", e);
}
}
/**
* Procedure evaluated on the cell set.
*/
protected class ParallelEvaluationProcedure implements ICellsProcedure {
/** List of tasks */
protected final List<MaxComputation> tasks = new ArrayList<ParallelMaxPostProcessor.MaxComputation>();
@Override
public boolean execute(ILocation location, int rowId, Object[] measures) {
Object[] numbers = measures.clone();
tasks.add(new MaxComputation(location, numbers));
return true; // continue
}
/** Once all the tasks are executed, write results */
public void writeResults(IAggregatesRetriever retriever) throws Exception {
// Invoke all the tasks in parallel
// using the fork join pool that runs the post processor.
ForkJoinTask.invokeAll(tasks);
for(MaxComputation task : tasks) {
retriever.write(task.location, task.get());
}
}
}
/**
* Max computation task. It illustrates our example well
* but in real-life this would be too little
* of a workload to deserve parallel execution.
*/
protected class MaxComputation extends RecursiveTask<Double> {
/** serialVersionUID */
private static final long serialVersionUID = -5843737025175189495L;
final ILocation location;
final Object[] numbers;
public MaxComputation(ILocation location, Object[] numbers) {
this.location = location;
this.numbers = numbers;
}
@Override
protected Double compute() {
try {
return doEvaluation(location, numbers);
} catch (QuartetException e) {
completeExceptionally(e);
return null;
}
}
}
}