3

下面的代码是 Java 中传递接力棒程序的一部分:

主 P1(写入器) P2(写入器) P3(读取器) P4(读取器)

主要的();

package ReadersPreference;

import java.util.concurrent.Semaphore;

/**
  * * @author me
 */
public class Main {

public static void main(String[] args) {


    AnyData x = new AnyData(5.7);//gives writers something to write, 
                                   //readers something to read
    Semaphore e = new Semaphore(1);//control entry
    Semaphore r = new Semaphore(0);//used to delay readers
    Semaphore w = new Semaphore(0);//used to delay writers

    int nr = 0;//readers active
    int nw = 0;//writers active

    int dr = 0;//readers waiting
    int dw = 0;//writers waiting

    P1 r1 = new P1(e, r, w, x, nr, nw, dr, dw); // #reader thread 1
    P2 r2 = new P2(e, r, w, x, nr, nw, dr, dw); // #reader thread 2
    P5 r3 = new P5(e, r, w, x, nr, nw, dr, dw); // #reader thread 3
    P6 r4 = new P6(e, r, w, x, nr, nw, dr, dw); // #reader thread 4

    P3 w1 = new P3(e, r, w, x, nr, nw, dr, dw); // #writer thread 1
    P4 w2 = new P4(e, r, w, x, nr, nw, dr, dw); // #writer thread 2

System.out.println("threads commanded to start");

r1.start();     // calls run() method in Thread
r2.start();
r3.start();     
r4.start();   
w1.start();
w2.start();
}//end of main
}//end of class

读者进程

package ReadersPreference;

import java.util.concurrent.Semaphore;

public class P1 extends Thread {

private Semaphore e;
private Semaphore r;
private Semaphore w;
private AnyData pillarbox;
private int nw;
private int nr;
private int dr;
private int dw;

public P1(Semaphore e, Semaphore r, Semaphore w, AnyData pbox,
        int nw, int nr, int dr, int dw) {

    this.nw = nw;
    this.nr = nr;
    this.dr = dr;
    this.dw = dw;

    this.e = e;
    this.r = r;
    this.w = w;
    pillarbox = pbox;
}// end of constructor

public void run() {

PERFORM OPERATIONS          

}// end of run method
}// end of class

现在,根据我的输出,它似乎可以工作。然而,我的讲师指出了两个主要缺陷。一种是计数器 (nr,nw,dr,dw) 通过值而不是通过引用传递。这意味着每个线程检查自己的数据副本而不是使用共享变量,这会阻止程序按应有的方式运行。

我一直在阅读大量关于传递值和参考的内容,起初我觉得我的头很肿,我想我现在明白了大部分,但我仍然没有看到解决方案。我可以理解为什么信号量可以工作,因为它们已经是一个引用(而不是它们被传递的值是一个引用)。我不知道计数器的问题到底出在哪里,也不知道如何解决它。

当他说线程时,他指的是进程类(构造函数/算法)还是主类中的实例化,还是两者兼而有之?

基于将原语共享为对象的阅读,我提出的最接近解决方案的解决方案如下:

public void setCounters(int nr){nr = newNR;}   

再次,虽然我对如何实现它含糊不清

第二个可能的主要缺陷是我创建了几个进程类文件并将每个文件作为一个线程运行,而不是编写两个(读取器/写入器)并根据需要使用尽可能多的对象。我很模糊这意味着什么,而且,考虑到我的输出打印语句对于每个进程都是唯一的,在输出中唯一地标识每个以进行验证,为什么这种方法在出于某种目的而被认为是缺陷时,确实它会影响读者/作者的解决方案吗?

4

2 回答 2

2

在我开始之前有一件事,好的变量名总是比注释更好;坚持这条规则,你就不会出错。想象一下,我在代码的某个地方遇到了你的e变量,我现在必须滚动到类的顶部并阅读注释以查看它的用途,然后回到我原来的位置。这使得代码几乎不可读......

您的第一个问题是您使用int的是原始类型,这是按值传递的。@Marcin 的解决方案不是线程安全的;如果你做这样的事情,int++那么当被多个线程调用时,它可以做各种奇怪的事情(比如不递增、返回错误的值等)。在多线程操作中始终使用线程安全对象

正如@Marcin 建议的那样,您可以将数据包装在一个类中以减少代码量:

public class SharedData<T> {

    private final T data;
    private final Semaphore entryControl = new Semaphore(1);
    private final Semaphore readerDelay = new Semaphore(0);
    private final Semaphore writerDelay = new Semaphore(0);
    private final AtomicInteger activeReaders = new AtomicInteger(0);
    private final AtomicInteger activeWriters = new AtomicInteger(0);
    private final AtomicInteger waitingReaders = new AtomicInteger(0);
    private final AtomicInteger waitingWriters = new AtomicInteger(0);

    public SharedData(final T data) {
        this.data = data;
    }
    //getters
}

我已经把它变成了一个泛型类,但是如果你愿意,你可以删除泛型并将“数据”设置为“对象”——泛型更好,因为它提供了类型安全,请在此处阅读。

该类使用该AtomicInteger对象,这是一个线程安全整数,允许进行原子操作,例如- 这消除了访问单个getAndSet(int newValue)值时出现线程安全问题的可能性,但看起来您可能想要访问两个,这仍然不是线程安全的,所以你可能希望按照以下方式向数据类添加一些方法:

public synchronized void makeReaderActive() {
    //perform checks etc
    waitingReaders.decrementAndGet();
    activeReaders.incrementAndGet();
}

否则,读者可能会减少waitingReaders然后某些东西可能会读取activeReaders之前增加的内容。

我认为,这涉及您的第一个查询;现在到你的第二个。您的老师说您创建了几个流程类文件而不是创建实例。这是因为您复制粘贴了同一个文件,称它为不同的东西(P1,P2等),这不是 Java 的工作方式。考虑您创建Semaphores 的代码:

Semaphore e = new Semaphore(1);//control entry
Semaphore r = new Semaphore(0);//used to delay readers

你有一个类文件(Semaphore.class),你已经创建了两个实例。您不必将 JDK 的Semaphore类复制到另一个文件中并创建它。您不必这样做:

Semaphore1 e = new Semaphore1(1);//control entry
Semaphore2 r = new Semaphore2(0);//used to delay readers

所以; 在您的示例中,我假设您有一些Reader流程和一些Writer流程,然后您需要两个类:

public class MyReader implements Callable<Void> {

    private final String name;
    private final SharedData sharedData;

    public MyReader(final String name, final SharedData sharedData) {
        this.name = name;
        this.sharedData = sharedData;
    }

    @Override
    public Void call() {
        //do stuff
        return null;
    }
}

public class MyWriter implements Callable<Void> {

    private final String name;
    private final SharedData sharedData;

    public MyWriter(final String name, final SharedData sharedData) {
        this.name = name;
        this.sharedData = sharedData;
    }

    @Override
    public Void call() {
        //do stuff
        return null;
    }
}

一类代表所有作者的工作,另一类代表所有读者的工作。我把它们做成了Callables 而不是线程;这让我想到了下一点。

您不应该使用Thread对象,这些对象级别非常低,难以正确管理。您应该使用新的ExecutorService。所以你的main方法现在看起来像:

public static void main(String[] args) {
    final SharedData<Double> sharedData = new SharedData<Double>(5.7);
    final List<Callable<Void>> myCallables = new LinkedList<Callable<Void>>();

    for (int i = 0; i < 4; ++i) {
        myCallables.add(new MyReader("reader" + i, sharedData));
    }
    for (int i = 0; i < 2; ++i) {
        myCallables.add(new MyWriter("writer" + i, sharedData));
    }

    final ExecutorService executorService = Executors.newFixedThreadPool(myCallables.size());
    final List<Future<Void>> futures;
    try {
        futures = executorService.invokeAll(myCallables);
    } catch (InterruptedException ex) {
        throw new RuntimeException(ex);
    }
    for (final Future<Void> future : futures) {
        try {
            future.get();
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        } catch (ExecutionException ex) {
            throw new RuntimeException(ex);
        }
    }
}

让我带您浏览这段代码,以便您了解这里发生了什么。在第一行,我们创建了一个新SharedData类——在本例中它是一个SharedData<Double>; 这意味着data它包含的类型是Double; 这可能是任何东西。

接下来我们创建一个Listof Callable,这是我们的工作类将去的地方。然后我们将工作类放入我们List的循环中;请注意,我们为每个创建了相同类的实例,MyReader并且MyWriter- 我们不需要为每个类创建一个类文件。

然后我们创建一个新ExecutorService的线程池,其大小与我们创建的工作类的数量相同——注意我们可以拥有更少的线程。在这种情况下,每个线程将分配一个工作类,然后当它们完成时,它们将被赋予一个新的,直到一切都完成。在我们的例子中,有足够多的线程,因此每个简单的线程都被分配了一个工作类。

我们现在调用invokeAll传入List工作类,这是我们向ExecutorService所有callsCallable询问的地方。这个方法一直阻塞直到一切都完成,它可能会InterrupedException像任何等待的方法一样抛出一个 - 在这种情况下,我们抛出异常并退出。

最后,这就是ExecutorService亮点所在,我们遍历返回的Future类列表并调用-如果运行与该未来相关的工作有任何问题,get这将抛出一个问题。ExecutionException在这种情况下,我们抛出一个异常。

注意Callables 是 type Void(即它们被声明为Callable<Void>),然后过滤到Futures 因为它们是 type Future<Void>。如果您想从每个进程返回一些数据,那么您可以将类型更改为Callable<MyData>,然后get方法Future将返回此数据。

于 2013-03-07T10:42:03.723 回答
0

考虑声明一个结构来保存变量,并传递结构而不是单个变量。这样,您的所有线程都可以处理相同的数据;即:看对方对它的改变。

例子:

static class Data {
    int nr = 0;
    int nw = 0;
    int dr = 0;
    int dw = 0;
}

然后将引用传递给Data而不是原语nr,nw,dr,dw

例子:

Data d = new Data();
P1 r1 = new P1(e, r, w, x, d);
P2 r2 = new P1(e, r, w, x, d);

你的构造函数声明P1将是:

public P1(Semaphore e, Semaphore r, Semaphore w, AnyData pbox, Data d)

确保同步对共享Data的编辑synchronized (d) {...}

此外,也许没有必要复制 classes P1,P2,P3,P4,考虑使用线程的名称来区分它们以用于您的日志记录目的。

希望这可以帮助。

于 2013-03-06T19:13:53.417 回答