0

我正在尝试使用线程在 Java 中实现 Bully 算法。
这是我编写的代码。

package newbully;

public class NewBully {

    public static void main(String[] args) {
        int total_processes = 4;
        Thread1[] t = new Thread1[total_processes];
        for (int i = 0; i < total_processes; i++) {
            t[i] = new Thread1(new Process(i+1, i+1), total_processes);
        }
        try {
            Election.initialElection(t);
        } catch (Exception e) {
            System.out.println("Possibly you are using null references in array");
        }
        for (int i = 0; i < total_processes; i++) {
            new Thread(t[i]).start();
        }
    }
}

package newbully;

public class Election {

    private static boolean pingFlag = false;
    private static boolean electionFlag = false;
    private static boolean messageFlag = false;

    public static boolean isMessageFlag() {
        return messageFlag;
    }

    public static void setMessageFlag(boolean messageFlag) {
        Election.messageFlag = messageFlag;
    }

    public static boolean isPingFlag() {
        return pingFlag;
    }

    public static void setPingFlag(boolean pingFlag) {
        Election.pingFlag = pingFlag;
    }

    public static boolean isElectionFlag() {
        return electionFlag;
    }

    public static void setElectionFlag(boolean electionFlag) {
        Election.electionFlag = electionFlag;
    }

    public static void initialElection(Thread1[] t) {
        Process temp = new Process(-1, -1);
        for (int i = 0; i < t.length; i++) {
            if (temp.getPriority() < t[i].getProcess().getPriority()) {
                temp = t[i].getProcess();
            }
        }
        t[temp.pid - 1].getProcess().CoOrdinatorFlag = true;
    }
}

package newbully;

public class Process {

    int pid;
    boolean downflag,CoOrdinatorFlag;

    public boolean isCoOrdinatorFlag() {
        return CoOrdinatorFlag;
    }

    public void setCoOrdinatorFlag(boolean isCoOrdinator) {
        this.CoOrdinatorFlag = isCoOrdinator;
    }
    int priority;

    public boolean isDownflag() {
        return downflag;
    }

    public void setDownflag(boolean downflag) {
        this.downflag = downflag;
    }

    public int getPid() {
        return pid;
    }

    public void setPid(int pid) {
        this.pid = pid;
    }

    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public Process() {
    }

    public Process(int pid, int priority) {
        this.pid = pid;
        this.downflag = false;
        this.priority = priority;
        this.CoOrdinatorFlag = false;
    }
}

package newbully;

import java.util.*;
import java.io.*;
import java.net.*;

public class Thread1 implements Runnable {

    private Process process;
    private int total_processes;
    ServerSocket[] sock;
    Random r;

    public Process getProcess() {
        return process;
    }

    public void setProcess(Process process) {
        this.process = process;
    }

    public Thread1(Process process, int total_processes) {
        this.process = process;
        this.total_processes = total_processes;
        this.r = new Random();
        this.sock = new ServerSocket[total_processes];
    }

    private void recovery() {
    }

    synchronized private void pingCoOrdinator() {
        try {
            if (Election.isPingFlag()) {
                wait();
            }
            if (!Election.isElectionFlag()) {
                Election.setPingFlag(true);
                System.out.println("Process[" + this.process.getPid() + "]: Are you alive?");
                Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345);
                outgoing.close();
                Election.setPingFlag(false);
                notifyAll();
            }
        } catch (Exception ex) {
            //Initiate Election
            System.out.println("process[" + this.process.getPid() + "]: -> Co-Ordinator is down\nInitiating Election");
            Election.setElectionFlag(true);
            Election.setPingFlag(false);
            notifyAll();
        }
    }

    synchronized private void executeJob() {
        int temp = r.nextInt(20);
        for (int i = 0; i <= temp; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                System.out.println("Error Executing Thread:" + process.getPid());
                System.out.println(e.getMessage());
            }
        }
    }

    synchronized private boolean sendMessage() {
        boolean response = false;
        int i = 0;
        try {
            if (Election.isMessageFlag()) {
                wait();
            }
            Election.setMessageFlag(true);

            for (i = this.process.getPid() + 1; i <= this.total_processes; i++) {
                try {
                    Socket electionMessage = new Socket(InetAddress.getLocalHost(), 10000 + i);
                    System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "]  responded to election message successfully");
                    electionMessage.close();
                    response = true;
                } catch (Exception ex) {
                    System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message");
                }
            }
            Election.setMessageFlag(false);
            notifyAll();
        } catch (Exception ex1) {
            System.out.println(ex1.getMessage());
        }

        return response;
    }

    synchronized private void serve() {
        try {
            //service counter
            ServerSocket s = new ServerSocket(12345);
            for (int counter = 0; counter <= 10; counter++) {
                Socket incoming = s.accept();
                System.out.println("Process[" + this.process.getPid() + "]:Yes");
                Scanner scan = new Scanner(incoming.getInputStream());
                PrintWriter out = new PrintWriter(incoming.getOutputStream(), true);
                if (scan.hasNextLine()) {
                    if (scan.nextLine().equals("Who is the co-ordinator?")) {
                        System.out.print("Process[" + this.process.getPid() + "]:");
                        out.println(this.process);
                    }
                }
                if (counter == 10) {//after serving 10 requests go down
                    this.process.setCoOrdinatorFlag(false);
                    this.process.setDownflag(true);
                    try {
                        incoming.close();
                        s.close();
                        sock[this.process.getPid() - 1].close();
                        Thread.sleep((this.r.nextInt(10) + 1) * 50000);//going down
                        recovery();
                    } catch (InterruptedException e) {
                        System.out.println(e.getMessage());
                    }
                }
            }
        } catch (IOException ex) {
            System.out.println(ex.getMessage());
        }
    }

    @Override
    public void run() {
        try {
            sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid());
        } catch (IOException ex) {
            System.out.println(ex.getMessage());
        }
        while (true) {
            if (process.isCoOrdinatorFlag()) {
                //serve other processes
                serve();
            } else {
                while (true) {
                    //Execute some task
                    executeJob();

                    //Ping the co-ordinator
                    pingCoOrdinator();

                    if (Election.isElectionFlag()) {
                        if (!sendMessage()) {//elect self as co-ordinator
                            System.out.println("New Co-Ordinator: Process[" + this.process.getPid() + "]");
                            this.process.setCoOrdinatorFlag(true);
                            Election.setElectionFlag(false);
                            break;
                        }
                    }
                }
            }
        }
    }
}

当我试图从我创建的 4 个线程中执行代码时,一些线程正在使用 wait() 调用预先等待。notifyAll() 不会通知他们。谁能建议为什么会这样?

4

2 回答 2

7

每个线程都在调用wait()自己(在自己的Thread1实例上)。这意味着当您调用notifyAll()同一个Thread1实例时,只会Thread1通知正在等待它的单个线程,而不是所有其他线程。

你要做的就是让你的所有Thread1对象都调用wait()一个单一的、通用的对象,同时也调用notifyAll()同一个对象。

当然,您必须在调用时wait()notifyAll()在公共对象上进行同步;如果你不这样做,你会得到一个IllegalMonitorStateException.

// Object to be used as a lock; pass this to all Thread1 instances
Object lock = new Object();

// Somewhere else in your code
synchronized (lock) {
    lock.wait();
}

// Where you want to notify
synchronized (lock) {
    lock.notifyAll();
}
于 2012-04-18T12:28:19.200 回答
3

两者notify()(或notifyAll())都wait()必须写入synchronized同一监视器上的块中。

例如:

synchronized(myLock) {
    wait();
}

..................

synchronized(myLock) {
    notifyAll();
}
于 2012-04-18T12:22:53.227 回答