0

我希望了解如何使用 Java CountDownLatch 来控制线程的执行。

我有两节课。一个被称为Poller,另一个是Referendum。线程在Referendum类中创建,它们的run()方法包含在Poller类中。

在 Poller 和 Referendum 类中,我通过 import 导入了 java 倒计时锁存器java.util.concurrent.CountDownLatch

我主要想了解为什么以及在哪里需要应用*.countDown();and语句,以及了解我是否在构造函数中正确初始化了 countDownLatch。*.await();Poller

这两个类的完整代码是:

import java.util.concurrent.CountDownLatch;

public class Poller extends Thread
{
private String id;                  // pollster id
private int pollSize;               // number of samples
private int numberOfPolls;          // number of times to perform a poll
private Referendum referendum;      // the referendum (implies voting population)
private int sampledVotes[];         // the counts of votes for or against
static CountDownLatch pollsAreComplete; //the CountDownLatch


/**
 * Constructor for polling organisation.
 * @param r A referendum on which the poller is gathering stats
 * @param id The name of this polling organisation
 * @param pollSize The size of the poll this poller will use
 * @param pollTimes The number of times this poller will conduct a poll
 * @param aLatch The coutn down latch that prevents the referendum results from being published
 */

public Poller(Referendum r, String id, int pollSize, int pollTimes, CountDownLatch aLatch)
{
    this.referendum = r;
    this.id = id;
    this.pollSize = pollSize;
    this.numberOfPolls = pollTimes;
    this.pollsAreComplete = aLatch;
    aLatch = new CountDownLatch(3);

    // for and against votes to be counted
    sampledVotes = new int[2];
}


// getter for numberOfPolls
public int getNumberOfPolls()
{
    return numberOfPolls;
}

@Override
//to use the countdown latch
public void run()
{      
    for (int i = 0; i < getNumberOfPolls(); i++)
    {
        resetVotes();
        pollVotes();
        publishPollResults();
    }
}

// make sure all sampledVotes are reset to zero
protected void resetVotes()
{
    // initialise the vote counts in the poll
    for (int i = 0; i < sampledVotes.length; i++)
    {
        sampledVotes[i] = 0;
    }
}

// sampling the way citizens will vote in a referendum
protected void pollVotes()
{
    for (int n = 0; n < pollSize; n++)
    {
        Citizen c = referendum.pickRandomCitizen();

        //As things stand, pickRandomCitizen can return null
        //because we haven't protected access to the collection
        if (c != null)
        {
            sampledVotes[c.voteFor()]++;
        }
    }
}

protected void publishPollResults()
{
    int vfor = 100 * sampledVotes[Referendum.FOR] / pollSize;

    int vagainst = 100 * sampledVotes[Referendum.AGAINST] / pollSize;

    System.out.printf("According to %-20s \t(", this.id + ":");

    System.out.print("FOR " + vfor);

    try
    {
        Thread.sleep(1000);
    } catch (Exception e)
    {
        e.printStackTrace();
    }

    System.out.println(", AGAINST " + vagainst + ")");
 }
}

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;  


public class Referendum
{
private List<Citizen> citizens;         //voters
private List<Poller> pollers;           //vote samplers
public static final int FOR = 0;        //index for votes array
public static final int AGAINST = 1;    //index for votes array
private int votes[];                    //for and against votes counters


public Referendum(int population)
{
    citizens = new LinkedList<Citizen>();
    pollers = new LinkedList<Poller>();

    // initialise the referendum with the population
    for (int i = 0; i < population; i++)
    {
        Citizen c = new Citizen(i % 4); //suppose equal party membership
        citizens.add(c);
    }

    votes = new int[2]; //in this example, only For or Against
}

public void addPoller(Poller np)
{
    pollers.add(np);
}

public Citizen removeCitizen(int i)
{
    return citizens.remove(i);
}

public List<Poller> getPollers()
{
    return pollers;
}

public void startPollsWithLatch()
{
   //create some poller threads that use a latch
    addPoller(new Poller(this, "The Daily Day", 100, 3, Poller.pollsAreComplete));
    addPoller(new Poller(this, "Stats people", 100, 3, Poller.pollsAreComplete));
    addPoller(new Poller(this, "TV Pundits", 100, 3, Poller.pollsAreComplete));

    // start the polls
    for (Poller p : pollers)
    {
        p.start();

    }
}



// pick a citizen randomly - access not controlled yet
public Citizen pickRandomCitizen()
{
    //TODO add code to this method for part (b)

    Citizen randomCitizen;
    // first get a random index
    int index = (int) (Math.random() * getPopulationSize());
    randomCitizen = citizens.remove(index);

    return randomCitizen;
}

// Counting the actual votes cast in the referendum
public void castVotes()
{
    for (int h = 0; h < getPopulationSize(); h++)
    {
        Citizen c = citizens.get(h);

        votes[c.voteFor()]++;
    }
}

// tell the size of population
public int getPopulationSize()
{
    return citizens.size();
}

// display the referendum results
public void revealResults()
{
    System.out.println(" **** The Referendum Results are out! ****");

    System.out.println("FOR");
    System.out.printf("\t %.2f %%\n", 100.0 * votes[FOR] / getPopulationSize());

    System.out.println("AGAINST");
    System.out.printf("\t %.2f %%\n", 100.0 * votes[AGAINST] / getPopulationSize());
}

public static void main(String[] args)
{
    // Initialise referendum. The number of people
    // has been made smaller here to reduce the simulation time.
    Referendum r = new Referendum(50000);


    r.startPollsWithLatch();

    r.castVotes();

    // reveal the results of referendum
    r.revealResults();
 }
}

简而言之......所有线程必须在执行publishPollResults();之前执行语句revealResults();

4

2 回答 2

0

好的,

现在,如果 publishPollResults 必须由所有人在 reavelResults 之前完成,那么您只需要等待 reveal 方法中的正确计数即可。但要做到这一点,闩锁也必须与公投对象共享,而不仅仅是轮询者。

因此,让公投创建闩锁并将其传递给轮询器:

public class Referendum
{
  CountDownLatch pollsAreComplete;
  ...

    public void startPollsWithLatch()
    {
        pollsAreComplete = new CountDownLatch(3); //create new latch to know when the voting is done
       //create some poller threads that use a latch
        addPoller(new Poller(this, "The Daily Day", 100, 3, pollsAreComplete)); //pass it to pollers
        addPoller(new Poller(this, "Stats people", 100, 3, pollsAreComplete));
        addPoller(new Poller(this, "TV Pundits", 100, 3, pollsAreComplete));

        // start the polls
        for (Poller p : pollers)
        {
            p.start();
        }
    }

    public void revealResults()
    {

    pollsAreComplete.await(); //we can pass this line only if the latch count went to 0

    System.out.println(" **** The Referendum Results are out! ****");
    ....
    }

}

所以轮询器应该共享闩锁。您使用的是 OKish 的静态变量,但您希望能够在不同的公投中使用轮询器。因此,最好将它作为实例字段并将其传递给构造函数(您从构造函数开始,但随后将值传递给静态变量,这是没有意义的(实际上它始终为空)。

public class Poller extends Thread
{
    ...
    private CountDownLatch pollsAreComplete; //the CountDownLatch shared with referendum

    public Poller(Referendum r, String id, int pollSize, int pollTimes, CountDownLatch aLatch)
    {
        ...
        this.pollsAreComplete = aLatch;
    }

    public void run()
    {      
        for (int i = 0; i < getNumberOfPolls(); i++)
        {
            resetVotes();
            pollVotes();
            publishPollResults();
        }
        pollsAreComplete.countDown(); //voting is finished, let the referendum publish the results.
    }

}

因此,一旦轮询器完成其工作,它就会降低闩锁,当所有这些都完成后,公投可以继续并打印结果。

请注意,所有 Poller 线程都会发布 3 次他们的结果(就像他们有 for 循环一样),只有当所有 3 个循环都结束时,他们才会发出公投信号。

如果您想要公投的 3 个单独阶段,如果使用锁存器将很难实现,因为一旦它降至 0,它就无法重置。

于 2015-04-09T23:05:43.327 回答
0

如果我理解正确,您希望在显示结果之前执行所有线程。这需要传递给每个线程的构造函数的类中的单个CountDownLatch实例。一旦结束轮询,每个都会调用锁存器,并调用睡眠直到锁存器倒计时达到零:ReferendumPollerPollercountdown()Referendumawait()

class Referendum {
  private CountDownLatch latch;
  public CountDownLatch getLatch() {
    return latch;
  }

  // ...

  public void startVotesWithLatch() {
    // You don't need to pass the latch in constructor,
    // as you can retrieve it from the referendum object passed
    addPoller(new Poller(this, "Stats people", 100, 3));
    // Add other pollers
    // Start all pollers
    for (Poller p : pollers) {
      p.start();
    }
    // Wait for all pollers to finish
    latch.await();
  }
}

并在Poller类中删除不需要的闩锁变量,然后在publishPollResults()方法中:

public void publishPollResults() {
  // Everything stays the same here, except we decrease the latch
  // when finished...
  referendum.getLatch().countDown();
}

但是请注意,这种类型的同步非常简单,不一定需要 a CountDownLatch,您可以简单地生成Poller线程,然后调用join()主线程(这将暂停主线程,直到子线程完成执行)。

于 2015-04-09T23:05:52.980 回答