0

在一个代码示例中,据说线程重新同步是基于信令,使用负债信号量。

final Semaphore indebtedSemaphore = new Semaphore(1 - PROCESSOR_COUNT);

这个负信号量的目的是什么,我的笔记本用-3初始化

/**
 * Sums two vectors, distributing the work load into as many new child threads as there
 * are processor cores within a given system. Note that the added cost of thread
 * construction and destruction is higher than the gain of distributing the work for
 * practically any vector size.
 * @param leftOperand the first operand
 * @param rightOperand the second operand
 * @return the resulting vector
 * @throws NullPointerException if one of the given parameters is null
 * @throws IllegalArgumentException if the given parameters do not share the same length
 */
public static double[] add(final double[] leftOperand, final double[] rightOperand) {
    if (leftOperand.length != rightOperand.length) throw new IllegalArgumentException();
    final double[] result = new double[leftOperand.length];

    final int sectorWidth = leftOperand.length / PROCESSOR_COUNT;
    final int sectorThreshold = leftOperand.length % PROCESSOR_COUNT;
    final Semaphore indebtedSemaphore = new Semaphore(1 - PROCESSOR_COUNT);

    for (int threadIndex = 0; threadIndex < PROCESSOR_COUNT; ++threadIndex) {
        final int startIndex = threadIndex * sectorWidth + (threadIndex < sectorThreshold ? threadIndex : sectorThreshold);
        final int stopIndex  = startIndex  + sectorWidth + (threadIndex < sectorThreshold ? 1 : 0);
        final Runnable runnable = new Runnable() {
            public void run() {
                try {
                    for (int index = startIndex; index < stopIndex; ++index) {
                        result[index] = leftOperand[index] + rightOperand[index];
                    }
                } finally {
                    indebtedSemaphore.release();
                }
            }
        };
        // EXECUTOR_SERVICE.execute(runnable);                          // uncomment for managed thread alternative!
        new Thread(runnable).start();                                   // comment for managed thread alternative!
    }

    indebtedSemaphore.acquireUninterruptibly();
    return result;
}
4

1 回答 1

0

源代码中的注释->

public static double[] add (final double[] leftOperand, final double[] rightOperand) {
        if (leftOperand.length != rightOperand.length) throw new IllegalArgumentException();
        final double[] result = new double[leftOperand.length];
    //--------------------------------------------------------------
    //EXAMPLE: 3 Cores, vector length 10 
    //--------------------------------------------------------------

    final int sectorWidth = leftOperand.length / PROCESSOR_COUNT;
    final int sectorThreshold = leftOperand.length % PROCESSOR_COUNT;
    //indebtedSemaphore (-2) ..  to wait for a unindebted semaphore
    final Semaphore indebtedSemaphore = new Semaphore(1 - PROCESSOR_COUNT);

    //for each core a thread
    for (int threadIndex = 0; threadIndex < PROCESSOR_COUNT; ++threadIndex) {
        //ranges 0-4 , 4-7, 7-10  
        final int startIndex = threadIndex * sectorWidth + (threadIndex < sectorThreshold ? threadIndex : sectorThreshold);
        final int stopIndex = startIndex + sectorWidth + (threadIndex < sectorThreshold ? 1 : 0);
        final Runnable runnable = new Runnable() {
            public void run () {
                try {
                    for (int index = startIndex; index < stopIndex; ++index) {
                        result[index] = leftOperand[index] + rightOperand[index];
                    }
                } finally {
                    //semaphore dept:-2,-1,0,
                    indebtedSemaphore.release();
                }
            }
        };
        // EXECUTOR_SERVICE.execute(runnable);                          // uncomment for managed thread alternative!
        new Thread(runnable).start();                                   // comment for managed thread alternative!
    }

    //wait for unindebted semaphore 
    indebtedSemaphore.acquireUninterruptibly();
    return result;
}
于 2014-02-20T13:56:41.097 回答