欺负算法的可疑代码输出

我正在为Java编写Bully算法程序
这是代码:

package newbully; public class NewBully { public static void main(String[] args) { int total_processes = 6; RunningThread[] t = new RunningThread[total_processes]; for (int i = 0; i < total_processes; i++) { t[i] = new RunningThread(new Process(i+1, i+1), total_processes);//passing process id, priority, total no. of processes to running thread } try { Election.initialElection(t); } catch (Exception e) { System.out.println("Possibly you are using null references in array"); } for (int i = 0; i  Recovered from Crash"); //Find current co-ordinator. } synchronized private void pingCoOrdinator() { try { if (Election.isPingFlag()) { synchronized (Election.lock) { Election.lock.wait(); } } if (!Election.isElectionFlag()) { Election.setPingFlag(true); System.out.println("Process[" + this.process.getPid() + "]: Are you alive?"); Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345); outgoing.close(); Election.setPingFlag(false); synchronized (Election.lock) { Election.lock.notifyAll(); } } } catch (Exception ex) { //Initiate Election System.out.println("process[" + this.process.getPid() + "]: -> Co-Ordinator is down\nInitiating Election"); Election.setElectionFlag(true); Election.setPingFlag(false); synchronized (Election.lock) { Election.lock.notifyAll(); } } } synchronized private void executeJob() { int temp = r.nextInt(20); for (int i = 0; i <= temp; i++) { try { Thread.sleep(700); } catch (InterruptedException e) { System.out.println("Error Executing Thread:" + process.getPid()); System.out.println(e.getMessage()); } } } synchronized private boolean sendMessage() { boolean response = false; int i = 0; try { if (Election.isMessageFlag()) { synchronized (Election.lock) { Election.lock.wait(); } } Election.setMessageFlag(true); if (Election.isElectionFlag()) { for (i = this.process.getPid() + 1; i  Process[" + i + "] responded to election message successfully"); electionMessage.close(); response = true; } catch (Exception ex) { System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message"); } } } Election.setMessageFlag(false); synchronized (Election.lock) { Election.lock.notifyAll(); } } catch (Exception ex1) { System.out.println(ex1.getMessage()); } return response; } synchronized private void serve() { try { //service counter Socket incoming = null; ServerSocket s = new ServerSocket(12345); for (int counter = 0; counter < 10; counter++) { incoming = s.accept(); System.out.println("Process[" + this.process.getPid() + "]:Yes"); Scanner scan = new Scanner(incoming.getInputStream()); PrintWriter out = new PrintWriter(incoming.getOutputStream(), true); if (scan.hasNextLine()) { if (scan.nextLine().equals("Who is the co-ordinator?")) { System.out.print("Process[" + this.process.getPid() + "]:"); out.println(this.process); } } } //after serving 10 requests go down for random time this.process.setCoOrdinatorFlag(false); this.process.setDownflag(true); try { incoming.close(); s.close(); sock[this.process.getPid() - 1].close(); Thread.sleep((this.r.nextInt(10) + 1) * 1000000);//going down recovery(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } } catch (IOException ex) { System.out.println(ex.getMessage()); } } @Override public void run() { try { sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid()); } catch (IOException ex) { System.out.println(ex.getMessage()); } while (true) { if (process.isCoOrdinatorFlag()) { //serve other processes serve(); } else { while (true) { //Execute some task executeJob(); //Ping the co-ordinator pingCoOrdinator(); if (Election.isElectionFlag()) { if (!sendMessage()) {//elect self as co-ordinator System.out.println("New Co-Ordinator: Process[" + this.process.getPid() + "]"); this.process.setCoOrdinatorFlag(true); Election.setElectionFlag(false); break; } } } } } } } package newbully; public class Election { private static boolean pingFlag = false; private static boolean electionFlag = false; private static boolean messageFlag = false; public static final Object lock = new Object(); public static boolean isMessageFlag() { return messageFlag; } public static void setMessageFlag(boolean messageFlag) { Election.messageFlag = messageFlag; } public static boolean isPingFlag() { return pingFlag; } public static void setPingFlag(boolean pingFlag) { Election.pingFlag = pingFlag; } public static boolean isElectionFlag() { return electionFlag; } public static void setElectionFlag(boolean electionFlag) { Election.electionFlag = electionFlag; } public static void initialElection(RunningThread[] t) { Process temp = new Process(-1, -1); for (int i = 0; i < t.length; i++) { if (temp.getPriority() < t[i].getProcess().getPriority()) { temp = t[i].getProcess(); } } t[temp.pid - 1].getProcess().CoOrdinatorFlag = true; } } package newbully; public class Process { int pid; boolean downflag,CoOrdinatorFlag; public boolean isCoOrdinatorFlag() { return CoOrdinatorFlag; } public void setCoOrdinatorFlag(boolean isCoOrdinator) { this.CoOrdinatorFlag = isCoOrdinator; } int priority; public boolean isDownflag() { return downflag; } public void setDownflag(boolean downflag) { this.downflag = downflag; } public int getPid() { return pid; } public void setPid(int pid) { this.pid = pid; } public int getPriority() { return priority; } public void setPriority(int priority) { this.priority = priority; } public Process() { } public Process(int pid, int priority) { this.pid = pid; this.downflag = false; this.priority = priority; this.CoOrdinatorFlag = false; } } 

这是输出:

 //--When delay in executeJob() method is 100 Process[4]: Are you alive? Process[6]:Yes Process[4]: Are you alive? Process[6]:Yes Process[3]: Are you alive? Process[6]:Yes Process[5]: Are you alive? Process[6]:Yes Process[1]: Are you alive? Process[6]:Yes Process[4]: Are you alive? Process[6]:Yes Process[3]: Are you alive? Process[6]:Yes Process[3]: Are you alive? Process[6]:Yes Process[2]: Are you alive? Process[6]:Yes Process[5]: Are you alive? Process[6]:Yes Process[1]: Are you alive? process[1]: -> Co-Ordinator is down Initiating Election Process[1] -> Process[2] responded to election message successfully Process[1] -> Process[3] responded to election message successfully Process[1] -> Process[4] responded to election message successfully Process[1] -> Process[5] responded to election message successfully Process[1] -> Process[6] did not respond to election message Process[2] -> Process[3] responded to election message successfully Process[3] -> Process[4] responded to election message successfully Process[4] -> Process[5] responded to election message successfully Process[2] -> Process[4] responded to election message successfully Process[2] -> Process[5] responded to election message successfully Process[3] -> Process[5] responded to election message successfully Process[5] -> Process[6] did not respond to election message New Co-Ordinator: Process[5] New Co-Ordinator: Process[1] Address already in use: JVM_Bind Address already in use: JVM_Bind Address already in use: JVM_Bind //--When delay in executeJob() method is 700 Process[3]: Are you alive? Process[6]:Yes Process[5]: Are you alive? Process[6]:Yes Process[2]: Are you alive? Process[1]: Are you alive? Process[6]:Yes Process[6]:Yes Process[5]: Are you alive? Process[1]: Are you alive? Process[6]:Yes Process[6]:Yes Process[4]: Are you alive? Process[6]:Yes Process[3]: Are you alive? Process[6]:Yes Process[2]: Are you alive? Process[6]:Yes Process[1]: Are you alive? Process[6]:Yes Process[4]: Are you alive? process[4]: -> Co-Ordinator is down Initiating Election Process[4] -> Process[5] responded to election message successfully Process[4] -> Process[6] did not respond to election message Process[5] -> Process[6] did not respond to election message New Co-Ordinator: Process[5] Process[1]: Are you alive? Process[5]:Yes Process[1]: Are you alive? Process[5]:Yes Process[3]: Are you alive? Process[5]:Yes Process[2]: Are you alive? Process[5]:Yes Process[1]: Are you alive? Process[5]:Yes Process[4]: Are you alive? Process[5]:Yes Process[2]: Are you alive? Process[5]:Yes Process[4]: Are you alive? Process[5]:Yes Process[3]: Are you alive? Process[5]:Yes Process[3]: Are you alive? Process[5]:Yes Process[2]: Are you alive? process[2]: -> Co-Ordinator is down Initiating Election Process[2] -> Process[3] responded to election message successfully Process[2] -> Process[4] responded to election message successfully Process[2] -> Process[5] did not respond to election message Process[2] -> Process[6] did not respond to election message Process[3] -> Process[4] responded to election message successfully Process[3] -> Process[5] did not respond to election message Process[3] -> Process[6] did not respond to election message Process[1] -> Process[2] responded to election message successfully Process[1] -> Process[3] responded to election message successfully Process[1] -> Process[4] responded to election message successfully Process[1] -> Process[5] did not respond to election message Process[1] -> Process[6] did not respond to election message Process[2] -> Process[3] responded to election message successfully Process[2] -> Process[4] responded to election message successfully Process[2] -> Process[5] did not respond to election message Process[2] -> Process[6] did not respond to election message Process[4] -> Process[5] did not respond to election message Process[4] -> Process[6] did not respond to election message New Co-Ordinator: Process[4] Process[3]: Are you alive? Process[4]:Yes Process[3]: Are you alive? Process[4]:Yes Process[1]: Are you alive? Process[4]:Yes Process[2]: Are you alive? Process[4]:Yes Process[1]: Are you alive? Process[4]:Yes Process[2]: Are you alive? Process[4]:Yes Process[2]: Are you alive? Process[4]:Yes Process[2]: Are you alive? Process[4]:Yes Process[3]: Are you alive? Process[4]:Yes Process[1]: Are you alive? Process[4]:Yes Process[3]: Are you alive? process[3]: -> Co-Ordinator is down Initiating Election Process[3] -> Process[4] did not respond to election message Process[3] -> Process[5] did not respond to election message Process[3] -> Process[6] did not respond to election message New Co-Ordinator: Process[3] New Co-Ordinator: Process[2] Address already in use: JVM_Bind Address already in use: JVM_Bind Address already in use: JVM_Bind Address already in use: JVM_Bind Address already in use: JVM_Bind 

最后,我开始得到Address already in use: JVM_BinAddress already in use: JVM_BinexceptionAddress already in use: JVM_Bin
另外,如果我们在提出exception之前检查最新选出的协调员,那么在要求is co-ordinator alive?之前,它会选择两次is co-ordinator alive?
我确信,当协调员死亡时,我已经提供了足够的延迟,以便它不会在两者之间醒来。
当我给予额外的dealy然后程序继续前进,否则它在中间停止。
那么为什么必须出现这个问题呢?

我找到了例外的原因
它正在发生,因为如果你在exception消息之前仔细查看输出,它就选择了协调员2ce。
每当一个Thread被选为协调器时,它就会在端口12345打开一个ServerSocket。
因为它发生了2ce,它可能会抛出exception。
但我不明白……为什么选择2ce?

两种情况下的错误消息是“地址已在使用中:JVM_Bind”。

此消息通常是java.net.BindException的一部分,当您尝试创建/打开Socket并且端口已在使用中时,会抛出该消息。 在这种情况下,您可能尝试打开或创建两次相同的套接字。

这可能发生,因为在打开和关闭套接字之间,exception会停止调用socket.close()。 在这种情况下,当您为选举领导者创建套接字时,但由于主机被“崩溃”,因此抛出exception,因此永远不会调用close()。

我想你需要添加这一行

electionMessage.close(); 发送消息的这一部分的catch子句。

 try { Socket electionMessage = new Socket(InetAddress.getLocalHost(), 10000 + i); System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] responded to election message successfully"); electionMessage.close(); response = true; } catch (Exception ex) { //Add close here System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message"); } 

此外,我建议添加接近所有其他相关的catch条款,以防它在其他地方发生这种情况,并且始终是避免相关问题的良好做法。 我还建议明确指出你想要在每个地方捕获哪些例外,这样你就不会被其他陷阱所困扰。

我希望这是一个很好的起点。


编辑以回应第一条评论

我认为您“锁定”线程的方式存在问题。 您依靠布尔标志来告诉您已达到代码中的特定点。 但布尔标志本身不是在同步块中控制的,也不是代码上的任何其他锁。 因此,多个线程可以通过锁定并导致意外事件,例如多次尝试打开同一个套接字。

您正在使用带有Object的代码进行锁定

 if (Election.isMessageFlag()) { synchronized (Election.lock) { Election.lock.wait(); } } Election.setMessageFlag(true); if (Election.isElectionFlag()) { // Do Stuff // Open/close Sockets etc } Election.setMessageFlag(false); synchronized (Election.lock) { Election.lock.notifyAll(); } 

这是多个线程可以在设置标志之前传递第一个if语句(在下一行!)以使后续线程等待。

但是,如果您使用ReentrantLock,则应使用以下代码:

 lock.lock(); // block until condition holds try { // Do Stuff // OPen CLose Sockets etc } finally { lock.unlock() } 

显然,您可能仍需要设置一些标志来说明选举是否正在进行,但请确保使用真实锁定(例如ReentrantLock)或在适当的同步块内保护正在运行的代码。

希望这可以帮助

在参考了上述所有评论后,我发布了更正的代码,完全可以让其他人参考。
欢迎代码中的任何改进建议……

 package newbully; public class NewBully { public static void main(String[] args) { int total_processes = 6; RunningThread[] t = new RunningThread[total_processes]; for (int i = 0; i < total_processes; i++) { t[i] = new RunningThread(new Process(i+1, i+1), total_processes);//passing process id, priority, total no. of processes to running thread } try { Election.initialElection(t); } catch (NullPointerException e) { System.out.println(e.getMessage()); } for (int i = 0; i < total_processes; i++) { new Thread(t[i]).start();//start every thread } } } package newbully; import java.util.concurrent.locks.ReentrantLock; public class Election { public static ReentrantLock pingLock = new ReentrantLock(); public static ReentrantLock electionLock = new ReentrantLock(); private static boolean electionFlag = false; //By default no election is going on private static boolean pingFlag = true; //By default I am allowed to ping public static Process electionDetector; public static Process getElectionDetector() { return electionDetector; } public static void setElectionDetector(Process electionDetector) { Election.electionDetector = electionDetector; } public static boolean isPingFlag() { return pingFlag; } public static void setPingFlag(boolean pingFlag) { Election.pingFlag = pingFlag; } public static boolean isElectionFlag() { return electionFlag; } public static void setElectionFlag(boolean electionFlag) { Election.electionFlag = electionFlag; } public static void initialElection(RunningThread[] t) { Process temp = new Process(-1, -1); for (int i = 0; i < t.length; i++) { if (temp.getPriority() < t[i].getProcess().getPriority()) { temp = t[i].getProcess(); } } t[temp.pid - 1].getProcess().CoOrdinatorFlag = true; } } package newbully; import java.util.*; import java.io.*; import java.net.*; public class RunningThread implements Runnable { private Process process; private int total_processes; private static boolean messageFlag[]; ServerSocket[] sock; Random r; public Process getProcess() { return process; } public void setProcess(Process process) { this.process = process; } public RunningThread(Process process, int total_processes) { this.process = process; this.total_processes = total_processes; this.r = new Random(); this.sock = new ServerSocket[total_processes]; RunningThread.messageFlag = new boolean[total_processes]; for (int i = 0; i < total_processes; i++) { RunningThread.messageFlag[i] = false; } } synchronized private void recovery() { while (Election.isElectionFlag());//if election is going on then wait System.out.println("Process[" + this.process.getPid() + "]: -> Recovered from Crash"); //Find current co-ordinator. try { Election.pingLock.lock(); Election.setPingFlag(false); Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345); Scanner scan = new Scanner(outgoing.getInputStream()); PrintWriter out = new PrintWriter(outgoing.getOutputStream(), true); System.out.println("Process[" + this.process.getPid() + "]:-> Who is the co-ordinator?"); out.println("Who is the co-ordinator?"); out.flush(); String pid = scan.nextLine(); String priority = scan.nextLine(); if (this.process.getPriority() > Integer.parseInt(priority)) { //Bully Condition out.println("Resign"); out.flush(); System.out.println("Process[" + this.process.getPid() + "]: Resign -> Process[" + pid + "]"); String resignStatus = scan.nextLine(); if (resignStatus.equals("Successfully Resigned")) { this.process.setCoOrdinatorFlag(true); sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid()); System.out.println("Process[" + this.process.getPid() + "]: -> Bullyed current co-ordinator Process[" + pid + "]"); } } else { out.println("Don't Resign"); out.flush(); } Election.pingLock.unlock(); return; } catch (IOException ex) { System.out.println(ex.getMessage()); } } synchronized private void pingCoOrdinator() { try { Election.pingLock.lock(); if (Election.isPingFlag()) { System.out.println("Process[" + this.process.getPid() + "]: Are you alive?"); Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345); outgoing.close(); } } catch (Exception ex) { Election.setPingFlag(false); Election.setElectionFlag(true); Election.setElectionDetector(this.process); //Initiate Election System.out.println("process[" + this.process.getPid() + "]: -> Co-Ordinator is down\n" + "process[" + this.process.getPid() + "]: ->Initiating Election"); } finally { Election.pingLock.unlock(); } } private void executeJob() { int temp = r.nextInt(20); for (int i = 0; i <= temp; i++) { try { Thread.sleep((temp + 1) * 100); } catch (InterruptedException e) { System.out.println("Error Executing Thread:" + process.getPid()); System.out.println(e.getMessage()); } } } synchronized private boolean sendMessage() { boolean response = false; try { Election.electionLock.lock(); if (Election.isElectionFlag() && !RunningThread.isMessageFlag(this.process.getPid() - 1) && this.process.priority >= Election.getElectionDetector().getPriority()) { for (int i = this.process.getPid() + 1; i <= this.total_processes; i++) { try { Socket electionMessage = new Socket(InetAddress.getLocalHost(), 10000 + i); System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] responded to election message successfully"); electionMessage.close(); response = true; } catch (IOException ex) { System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message"); } catch (Exception ex) { System.out.println(ex.getMessage()); } } this.setMessageFlag(true, this.process.getPid() - 1);//My message sending is done Election.electionLock.unlock(); return response; } else { throw new Exception(); } } catch (Exception ex1) { Election.electionLock.unlock(); return true; } } public static boolean isMessageFlag(int index) { return RunningThread.messageFlag[index]; } public static void setMessageFlag(boolean messageFlag, int index) { RunningThread.messageFlag[index] = messageFlag; } synchronized private void serve() { try { boolean done = false; Socket incoming = null; ServerSocket s = new ServerSocket(12345); Election.setPingFlag(true); int temp = this.r.nextInt(5) + 5;// min 5 requests and max 10 requests for (int counter = 0; counter < temp; counter++) { incoming = s.accept(); if (Election.isPingFlag()) { System.out.println("Process[" + this.process.getPid() + "]:Yes"); } Scanner scan = new Scanner(incoming.getInputStream()); PrintWriter out = new PrintWriter(incoming.getOutputStream(), true); while (scan.hasNextLine() && !done) { String line = scan.nextLine(); if (line.equals("Who is the co-ordinator?")) { System.out.println("Process[" + this.process.getPid() + "]:-> " + this.process.getPid()); out.println(this.process.getPid()); out.flush(); out.println(this.process.getPriority()); out.flush(); } else if (line.equals("Resign")) { this.process.setCoOrdinatorFlag(false); out.println("Successfully Resigned"); out.flush(); incoming.close(); s.close(); System.out.println("Process[" + this.process.getPid() + "]:-> Successfully Resigned"); return; } else if (line.equals("Don't Resign")) { done = true; } } } //after serving 5-10 requests go down for random time this.process.setCoOrdinatorFlag(false); this.process.setDownflag(true); try { incoming.close(); s.close(); sock[this.process.getPid() - 1].close(); Thread.sleep(15000);//(this.r.nextInt(10) + 1) * 10000);//going down recovery(); } catch (Exception e) { System.out.println(e.getMessage()); } } catch (IOException ex) { System.out.println(ex.getMessage()); } } @Override public void run() { try { sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid()); } catch (IOException ex) { System.out.println(ex.getMessage()); } while (true) { if (process.isCoOrdinatorFlag()) { //serve other processes serve(); } else { while (true) { //Execute some task executeJob(); //Ping the co-ordinator pingCoOrdinator(); //Do Election if (Election.isElectionFlag()) { if (!sendMessage()) {//elect self as co-ordinator Election.setElectionFlag(false);//Election is Done System.out.println("New Co-Ordinator: Process[" + this.process.getPid() + "]"); this.process.setCoOrdinatorFlag(true); for (int i = 0; i < total_processes; i++) { RunningThread.setMessageFlag(false, i); } break; } } } } } } } package newbully; public class Process { int pid; boolean downflag,CoOrdinatorFlag; public boolean isCoOrdinatorFlag() { return CoOrdinatorFlag; } public void setCoOrdinatorFlag(boolean isCoOrdinator) { this.CoOrdinatorFlag = isCoOrdinator; } int priority; public boolean isDownflag() { return downflag; } public void setDownflag(boolean downflag) { this.downflag = downflag; } public int getPid() { return pid; } public void setPid(int pid) { this.pid = pid; } public int getPriority() { return priority; } public void setPriority(int priority) { this.priority = priority; } public Process() { } public Process(int pid, int priority) { this.pid = pid; this.downflag = false; this.priority = priority; this.CoOrdinatorFlag = false; } }