我一直在做一个关于 Java 多线程的学校作业。我卡在其中的一个任务是,我们需要在不同的组中创建多个线程,并且一旦每个组中有 4 个线程,它们才能被释放以一致地工作,否则它们必须被搁置/等待。例如:
- 线程 a、b、c 加入第 7 组,它们都被搁置/等待。
- 线程 d 加入组 7,所有四个线程 (a,b,c,d) 都被通知终止。
- 线程 e,f,g,h,i 加入第 8 组,在这种情况下,当线程 i 处于等待状态时,e,f,g,h 将发出终止信号。
- 线程 j 加入第 7 组,处于等待状态。
这是我完成的一般任务。我正在处理的任务要求我们释放组中的前 4 个初始线程,其余的应该等到前面的 4 个线程调用完成()。
例如,3 个线程加入组 65,它们处于等待状态。另一个线程加入组 65,所有 4 个线程一起释放。现在有 4 个线程正在工作(已终止)。现在线程 e,f,g,h,i,j,k,l 加入组 65。所有线程都等待直到 e,f,g,h 调用了 finished() 方法。
这是我到目前为止所做的:
ExtrinsicSync.java:
import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
public class ExtrinsicSync {
private HashMap<Integer, ConditionWrapper> groupThreadCount;
private ReentrantLock monitor;
private int count = 0;
ExtrinsicSync() {
groupThreadCount = new HashMap<>();
monitor = new ReentrantLock();
}
@Override
public void waitForThreadsInGroup(int groupId) {
monitor.lock();
if (!groupThreadCount.containsKey(groupId))
groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
condWrapper.setValue(condWrapper.getValue() + 1);
if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
{
condWrapper.getCondition().signalAll();
condWrapper.setInitialStatus(false);
System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
} else {
System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
try { condWrapper.getCondition().await(); }
catch (InterruptedException e) { e.printStackTrace(); }
}
monitor.unlock();
}
@Override
public void finished(int groupId) {
monitor.lock();
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
if(!condWrapper.getInitialStatus())
{
condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
if(condWrapper.getFinishedCount() == 4)
{
condWrapper.setFinishedCount(0);
condWrapper.getCondition().signalAll();
System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
}
}
monitor.unlock();
}
外部同步测试.java:
import org.junit.Test;
import java.util.EnumMap;
class TestTask1 implements Runnable{
final int group;
final ExtrinsicSync s1;
TestTask1(int group, ExtrinsicSync s1)
{
this.group = group;
this.s1 = s1;
}
public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}
public class ExtrinsicSyncTest {
@Test
public void testPhaseThreethreads() {
int nThreads = 22;
Thread t[] = new Thread[nThreads];
final ExtrinsicSync s1 = new ExtrinsicSync();
for(int i = 0; i < nThreads/2; i++)
(t[i] = new Thread(new TestTask1(66, s1))).start();
for(int i = nThreads/2; i < nThreads; i++)
(t[i] = new Thread(new TestTask1(70, s1))).start();
for (Thread ti : t)
{
try { ti.join(100); }
catch (Exception e) { System.out.println(e); }
}
EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);
for (Thread.State s : Thread.State.values())
threadsInThisState.put(s, 0);
for (Thread ti : t)
{
Thread.State state = ti.getState();
int n = threadsInThisState.get(state);
threadsInThisState.put(state, n + 1);
}
System.out.println("threadsInThisState: " + threadsInThisState.toString() );
}
}
ConditionWrapper.java:
import java.util.concurrent.locks.Condition;
public class ConditionWrapper {
private Condition cond;
private Integer value;
private Integer finishedCount;
private boolean initialThreads;
public ConditionWrapper(Condition condition)
{
this.cond = condition;
this.value = 0;
this.finishedCount = 0;
this.initialThreads = true;
}
// Returns the condition object of current request
public Condition getCondition()
{
return this.cond;
}
// Gets the current counter of threads waiting in this queue.
public Integer getValue()
{
return this.value;
}
// Sets the given value. Used for resetting the counter.
public void setValue(int value) { this.value = value; }
// Sets the counter to help keep track of threads which called finished() method
public void setFinishedCount(int count) { this.finishedCount = count; }
// Gets the finished count.
public Integer getFinishedCount() { return this.finishedCount; }
// This flag is to identify initial threads of a group
public boolean getInitialStatus() { return initialThreads; }
public void setInitialStatus(boolean val) { this.initialThreads = val; }
}
我遇到的问题是我能够释放每个组的前四个线程,但不知何故,某处有 2 个线程被随机终止,我无法弄清楚发生了什么。例如,将上面的 22 个线程测试用例分为两组,只有 8 个线程应该被终止,其余的线程等待。
但是这里有 10 个线程被终止。我不明白发生了什么事。我已尽我所能将代码精简到最低限度。