1

我考虑使用 ActivePivot 实例来计算 CVA(信用估值调整)。

我必须在大量单元格(每个交易对手 20k)上应用一段逻辑,每个单元格都与大小为 10k 的浮点数组相关联。即使 ActivePivot 是大规模多线程的,ABasicPostProcessor 也会以单线程方式应用于每个范围位置。我怎样才能让它以多线程方式通过我的点位置进行计算?

4

2 回答 2

1

我构建了以下类,它专门通过以多线程方式添加对 doEvaluation 的调用来专门化 ABasicPostProcessor(一个能够快速实现每点后处理器的核心类)。

给定 ABasicPostProcessor 专业化,只需扩展 AParallelBasicPostProcessor 以获得并行评估!

/**
 * Specialization of ABasicPostProcessor which will call doEvaluation in a
 * multithreaded way
 * 
 * @author BLA
 */
public abstract class AParallelBasicPostProcessor<OutputType> extends ABasicPostProcessor<OutputType> {
    private static final long serialVersionUID = -3453966549173516186L;

    public AParallelBasicPostProcessor(String name, IActivePivot pivot) {
        super(name, pivot);
    }

    @Override
    public void evaluate(ILocation location, final IAggregatesRetriever retriever) throws QuartetException {
        // Retrieve required aggregates
        final ICellSet cellSet = retriever.retrieveAggregates(Collections.singleton(location), Arrays.asList(prefetchMeasures));

        // Prepare a List
        List<ALocatedRecursiveTask<OutputType>> tasks = new ArrayList<ALocatedRecursiveTask<OutputType>>();

        // Create the procedure to hold the parallel sub-tasks
        final ICellsProcedure subTasksGeneration = makeSubTasksGenerationProcedure(tasks);

        cellSet.forEachLocation(subTasksGeneration, underlyingMeasures);

        ForkJoinTask.invokeAll(tasks);

        for (ALocatedRecursiveTask<OutputType> task : tasks) {
            OutputType returnValue;
            try {
                returnValue = task.get();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                // re-throw the root cause of the ExecutionException
                throw new RuntimeException(e.getCause());
            }

            // We can write only non-null aggregates
            if (null != returnValue) {
                writeInRetriever(retriever, task.getLocation(), returnValue);
            }
        }
    }

    protected void writeInRetriever(IAggregatesRetriever retriever, ILocation location, OutputType returnValue) {
        retriever.write(location, returnValue);
    }

    protected ICellsProcedure makeSubTasksGenerationProcedure(List<ALocatedRecursiveTask<OutputType>> futures) {
        return new SubTasksGenerationProcedure(futures);
    }

    /**
     * {@link ICellsProcedure} registering a {@link ALocatedRecursiveTask} per
     * point location
     */
    protected class SubTasksGenerationProcedure implements ICellsProcedure {

        protected List<ALocatedRecursiveTask<OutputType>> futures;

        public SubTasksGenerationProcedure(List<ALocatedRecursiveTask<OutputType>> futures) {
            this.futures = futures;
        }

        @Override
        public boolean execute(final ILocation pointLocation, int rowId, Object[] measures) {
            // clone the array of measures as it is internally used as a buffer
            final Object[] clone = measures.clone();

            futures.add(makeLocatedFuture(pointLocation, clone));

            return true;
        }
    }

    protected ALocatedRecursiveTask<OutputType> makeLocatedFuture(ILocation pointLocation, Object[] measures) {
        return new LocatedRecursiveTask(pointLocation, measures);
    }

    /**
     * A specialization of RecursiveTask by associating it to a
     * {@link ILocation}
     * 
     * @author BLA
     * 
     */
    protected static abstract class ALocatedRecursiveTask<T> extends RecursiveTask<T> {
        private static final long serialVersionUID = -6014943980790547011L;

        public abstract ILocation getLocation();
    }

    /**
     * Default implementation of {@link ALocatedRecursiveTask}
     * 
     * @author BLA
     * 
     */
    protected class LocatedRecursiveTask extends ALocatedRecursiveTask<OutputType> {
        private static final long serialVersionUID = 676859831679236794L;

        protected ILocation pointLocation;
        protected Object[] measures;

        public LocatedRecursiveTask(ILocation pointLocation, Object[] measures) {
            this.pointLocation = pointLocation;
            this.measures = measures;

            if (pointLocation.isRange()) {
                throw new RuntimeException(this.getClass() + " accepts only point location: " + pointLocation);
            }
        }

        @Override
        protected OutputType compute() {
            try {
                // The custom evaluation will be computed in parallel
                return AParallelBasicPostProcessor.this.doEvaluation(pointLocation, measures);
            } catch (QuartetException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public ILocation getLocation() {
            return pointLocation;
        }
    }
}
于 2012-09-27T19:59:00.923 回答
0

ActivePivot 查询引擎是多线程的,在单个查询中调用多个后处理器是并行完成的(当然,除非一个依赖于另一个的结果)。当同一个后处理器在查询中涉及的位置多次执行时,这也是并行完成的。因此,在卷起袖子之前,有必要检查一下您的查询计划中是否没有更明显的瓶颈。

现在,在一个位置调用一个后处理器确实是 ActivePivot 查询引擎中不可分割的工作负载。如果聚合不仅仅是以纳秒为单位的数字,而是像向量这样的大型或结构化对象,则可能存在并行驱动性能提升的空间。

ActivePivot 查询引擎构建在 fork/join 池 ( http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html ) 之上。这意味着您的后处理器代码总是从分叉连接池中调用,这使得您可以分叉您自己的子任务,然后加入它们。这被认为是一个专家技巧,如果没有对分叉连接池的工作原理有一个公平的了解,请不要尝试。

让我们考虑一个后处理器,它为每个评估的位置计算几个度量的最大值:

package com.quartetfs.pivot.sandbox.postprocessor.impl;

import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.postprocessing.impl.ABasicPostProcessor;
import com.quartetfs.fwk.QuartetException;
import com.quartetfs.fwk.QuartetExtendedPluginValue;

/**
 * 
 * Post processor that computes the MAX of several measures.
 * 
 * @author Quartet FS
 *
 */
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.postprocessing.IPostProcessor", key = MaxPostProcessor.TYPE)
public class MaxPostProcessor extends ABasicPostProcessor<Double> {

    /** serialVersionUID */
    private static final long serialVersionUID = -8886545079342151420L;

    /** Plugin type */
    public static final String TYPE = "MAX";

    public MaxPostProcessor(String name, IActivePivot pivot) {
        super(name, pivot);
    }

    @Override
    public String getType() { return TYPE; }

    @Override
    protected Double doEvaluation(ILocation location, Object[] measures) throws QuartetException {
        double max = ((Number) measures[0]).doubleValue();
        for(int i = 1; i < measures.length; i++) {
            max = Math.max(max, ((Number) measures[i]).doubleValue());
        }
        return max;
    }

}

在该后处理器中,由评估的范围位置产生的叶位置将一个接一个地计算。您可以决定改为创建任务,并通过分叉连接池并行执行这些任务。我希望以下内容可以帮助您入门:

package com.quartetfs.pivot.sandbox.postprocessor.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import jsr166y.ForkJoinTask;
import jsr166y.RecursiveTask;

import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.cellset.ICellSet;
import com.quartetfs.biz.pivot.cellset.ICellsProcedure;
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesRetriever;
import com.quartetfs.fwk.QuartetException;
import com.quartetfs.fwk.QuartetExtendedPluginValue;

/**
 * 
 * Post processor that computes the MAX of several measures,
 * evaluation of locations is performed in parallel.
 * 
 * @author Quartet FS
 *
 */
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.postprocessing.IPostProcessor", key = ParallelMaxPostProcessor.TYPE)
public class ParallelMaxPostProcessor extends MaxPostProcessor {

    /** serialVersionUID */
    private static final long serialVersionUID = -8886545079342151420L;

    /** Plugin type */
    public static final String TYPE = "PMAX";

    public ParallelMaxPostProcessor(String name, IActivePivot pivot) {
        super(name, pivot);
    }

    @Override
    public String getType() { return TYPE; }

    @Override
    public void evaluate(ILocation location, IAggregatesRetriever retriever)throws QuartetException {
        try {
            // Retrieve required aggregates
            ICellSet cellSet = retriever.retrieveAggregates(Collections.singleton(location), Arrays.asList(prefetchMeasures));

            // Evaluate the cell set to create tasks
            ParallelEvaluationProcedure evalProcedure = new ParallelEvaluationProcedure();
            cellSet.forEachLocation(evalProcedure);

            // Execute the tasks in parallel and write results
            evalProcedure.writeResults(retriever);

        } catch(Exception e) {
            throw new QuartetException("Evaluation of " + this + " on location " + location + " failed.", e);
        }
    }

    /**
     * Procedure evaluated on the cell set.
     */
    protected class ParallelEvaluationProcedure implements ICellsProcedure {

        /** List of tasks */
        protected final List<MaxComputation> tasks = new ArrayList<ParallelMaxPostProcessor.MaxComputation>();

        @Override
        public boolean execute(ILocation location, int rowId, Object[] measures) {
            Object[] numbers = measures.clone();
            tasks.add(new MaxComputation(location, numbers));
            return true;  // continue
        }

        /** Once all the tasks are executed, write results */
        public void writeResults(IAggregatesRetriever retriever) throws Exception {

            // Invoke all the tasks in parallel
            // using the fork join pool that runs the post processor.
            ForkJoinTask.invokeAll(tasks);

            for(MaxComputation task : tasks) {
                retriever.write(task.location, task.get());
            }
        }
    }


    /**
     * Max computation task. It illustrates our example well
     * but in real-life this would be too little
     * of a workload to deserve parallel execution.
     */
    protected class MaxComputation extends RecursiveTask<Double> {

        /** serialVersionUID */
        private static final long serialVersionUID = -5843737025175189495L;

        final ILocation location;
        final Object[] numbers;

        public MaxComputation(ILocation location, Object[] numbers) {
            this.location = location;
            this.numbers = numbers;
        }

        @Override
        protected Double compute() {
            try {
                return doEvaluation(location, numbers);
            } catch (QuartetException e) {
                completeExceptionally(e);
                return null;
            }
        }
    }


}
于 2012-09-27T15:38:26.030 回答