0

我有两个Runnable班,ReaderWriter

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.concurrent.Exchanger;

public class Reader implements Runnable {
    private static final int THRESHHOLD = 1000;
    private final int START, STOP;
    private Exchanger<ByteBuffer> exch;
    private RandomAccessFile file;
    private ByteBuffer buffer;

    public Reader(Exchanger<ByteBuffer> ex, RandomAccessFile f, int start, int stop) {
        START = start;
        STOP = stop;
        exch = ex;
        file = f;
        buffer = ByteBuffer.allocate(THRESHHOLD);
        buffer.mark();
    }

    @Override
    public void run() {
        for(int i = START; i < STOP; i++)
            try {
                buffer.put((byte)file.read());
            } catch(IOException e) {
                e.printStackTrace();
            }
        try {
            exch.exchange(buffer);
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}



import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.concurrent.Exchanger;
import java.util.concurrent.locks.ReentrantLock;

public class Writer implements Runnable {
    private static final int THRESHHOLD = 1000;
    private final int START, STOP;
    private ReentrantLock lock;
    private Exchanger<ByteBuffer> exch;
    private RandomAccessFile file;
    private ByteBuffer buffer;

    public Writer(Exchanger<ByteBuffer> e, ReentrantLock l, RandomAccessFile f, int start, int stop) {
        lock = l;
        START = start;
        STOP = stop;
        exch = e;
        file = f;
    }

    @Override
    public void run() {
        try {
            buffer = exch.exchange(ByteBuffer.allocate(THRESHHOLD));
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
        lock.lock();
        for(int i = START; i < STOP; i++)
            try {
                file.write(buffer.get());
            } catch(IOException e) {
                e.printStackTrace();
            }
        lock.unlock();
    }

}

这两个线程都使用 anExchanger来交换相同类型的数据。如何确保只在ReaderWriter线程之间进行交换,而不是在 smae 类型的两个线程之间进行?

4

2 回答 2

0

如果您担心“同类”的两个线程会调用同一个 Exchanger 实例的 exchange 方法(从您带来的示例中很难猜出它是如何实现的,但您更了解您的工作组合),那么您可以拦截(通过子类化或委托,你更喜欢哪个)这个方法并检查线程的组合是对还是错,以及在每种情况下应该做什么。这是一个通过子分类的示例,仅针对最简单的情况进行了测试

public class ThreadStrictExchanger<V> extends Exchanger<V> {
    private Thread waitingThread; 

    @Override
    public V exchange(V x) throws InterruptedException {
        Thread currentThread = Thread.currentThread();
        if (waitingThread == null){
            waitingThread = currentThread;
        } else {
            checkThreads(waitingThread, currentThread);
            waitingThread = null;
        }
        return super.exchange(x);
    }

    private void checkThreads(Thread waitingThread, Thread currentThread) {
        //TODO add here your logic/change  
    }
}

您可以按类型或名称比较两个线程(使用 Thread.setName()/getName())。如果 checkThreads 方法中的两个线程的组合是错误的,则由您决定采取什么措施 - 抛出异常甚至返回 false 以指示应跳过 Exchanger.exchange 的实际调用(如果它不破坏调用者的逻辑)。

只需在 Reader 和 Writer ctors 的调用中将 Exchanger 替换为您的类即可。请注意,不会覆盖另一种交换方法,但如果需要,逻辑将是相同的。

于 2013-07-16T22:10:59.710 回答
0

你的问题有点不清楚。由于您在这些读取器和写入器之间共享相同的交换器实例,因此其他线程将无法参与此交换。

于 2013-07-16T17:22:58.407 回答