2

我一直在做一个关于 Java 多线程的学校作业。我卡在其中的一个任务是,我们需要在不同的组中创建多个线程,并且一旦每个组中有 4 个线程,它们才能被释放以一致地工作,否则它们必须被搁置/等待。例如:

  • 线程 a、b、c 加入第 7 组,它们都被搁置/等待。
  • 线程 d 加入组 7,所有四个线程 (a,b,c,d) 都被通知终止。
  • 线程 e,f,g,h,i 加入第 8 组,在这种情况下,当线程 i 处于等待状态时,e,f,g,h 将发出终止信号。
  • 线程 j 加入第 7 组,处于等待状态。

这是我完成的一般任务。我正在处理的任务要求我们释放组中的前 4 个初始线程,其余的应该等到前面的 4 个线程调用完成()。

例如,3 个线程加入组 65,它们处于等待状态。另一个线程加入组 65,所有 4 个线程一起释放。现在有 4 个线程正在工作(已终止)。现在线程 e,f,g,h,i,j,k,l 加入组 65。所有线程都等待直到 e,f,g,h 调用了 finished() 方法。

这是我到目前为止所做的:

ExtrinsicSync.java:

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;

public class ExtrinsicSync {

    private HashMap<Integer, ConditionWrapper> groupThreadCount;
    private ReentrantLock monitor;
    private int count = 0;

    ExtrinsicSync() {
        groupThreadCount = new HashMap<>();
        monitor = new ReentrantLock();
    }

@Override
public void waitForThreadsInGroup(int groupId) {
    monitor.lock();

    if (!groupThreadCount.containsKey(groupId))
        groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));

    ConditionWrapper condWrapper = groupThreadCount.get(groupId);
    condWrapper.setValue(condWrapper.getValue() + 1);

    if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
    {
        condWrapper.getCondition().signalAll();
        condWrapper.setInitialStatus(false);

        System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
    } else {
        System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
        try { condWrapper.getCondition().await(); }
        catch (InterruptedException e) { e.printStackTrace(); }

    }

    monitor.unlock();
}

@Override
public void finished(int groupId) {
    monitor.lock();
    ConditionWrapper condWrapper = groupThreadCount.get(groupId);

    if(!condWrapper.getInitialStatus())
    {
        condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
        System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
        if(condWrapper.getFinishedCount() == 4)
        {
            condWrapper.setFinishedCount(0);
            condWrapper.getCondition().signalAll();
            System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
        }
    }
    monitor.unlock();
}

外部同步测试.java:

import org.junit.Test;

import java.util.EnumMap;

class TestTask1 implements Runnable{

    final int group;
    final ExtrinsicSync s1;

    TestTask1(int group, ExtrinsicSync s1)
    {
        this.group = group;
        this.s1 = s1;
    }

    public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}

public class ExtrinsicSyncTest {

    @Test
    public void testPhaseThreethreads() {

        int nThreads = 22;

        Thread t[] = new Thread[nThreads];
        final ExtrinsicSync s1 = new ExtrinsicSync();

        for(int i = 0; i < nThreads/2; i++)
            (t[i] = new Thread(new TestTask1(66, s1))).start();

        for(int i = nThreads/2; i < nThreads; i++)
            (t[i] = new Thread(new TestTask1(70, s1))).start();

        for (Thread ti : t)
        {
            try { ti.join(100); }
            catch (Exception e) { System.out.println(e); }
        }

        EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);

        for (Thread.State s : Thread.State.values())
            threadsInThisState.put(s, 0);

        for (Thread ti : t)
        {
            Thread.State state = ti.getState();
            int n = threadsInThisState.get(state);
            threadsInThisState.put(state, n + 1);
        }

        System.out.println("threadsInThisState: " + threadsInThisState.toString() );

    }
}

ConditionWrapper.java:

import java.util.concurrent.locks.Condition;

public class ConditionWrapper {
    private Condition cond;
    private Integer value;
    private Integer finishedCount;
    private boolean initialThreads;

    public ConditionWrapper(Condition condition)
    {
        this.cond = condition;
        this.value = 0;
        this.finishedCount = 0;
        this.initialThreads = true;
    }
    // Returns the condition object of current request
    public Condition getCondition()
    {
        return this.cond;
    }
    // Gets the current counter of threads waiting in this queue.
    public Integer getValue()
    {
        return this.value;
    }
    // Sets the given value. Used for resetting the counter.
    public void setValue(int value) { this.value = value; }
    // Sets the counter to help keep track of threads which called finished() method
    public void setFinishedCount(int count) { this.finishedCount = count; }
    // Gets the finished count.
    public Integer getFinishedCount() { return this.finishedCount; }
    // This flag is to identify initial threads of a group
    public boolean getInitialStatus() { return initialThreads; }
    public void setInitialStatus(boolean val) { this.initialThreads = val; }
}

我遇到的问题是我能够释放每个组的前四个线程,但不知何故,某处有 2 个线程被随机终止,我无法弄清楚发生了什么。例如,将上面的 22 个线程测试用例分为两组,只有 8 个线程应该被终止,其余的线程等待。

但是这里有 10 个线程被终止。我不明白发生了什么事。我已尽我所能将代码精简到最低限度。

4

1 回答 1

2

问题在于,对于非初始线程(getInitialStatus==false),您不会向其他线程发出信号,但是当您达到其中的四个时,您仍然会终止它们。所以这就是发生的事情:

  1. 前三个线程增加计数并等待
  2. 第四个线程达到 count == 4 并设置 initial = false 并向所有其他线程发出信号并将计数设置为零
  3. 接下来的三个线程将计数增加一
  4. 8 个线程达到计数 == 4 并被终止。由于 getInitialStatus==false 这个线程不会通知其他线程。

所以 4*2 个线程 + 2 个线程被终止。正是您在测试中看到的计数。


这是实现此功能的一种潜在方法:

  1. 在每个线程或任务中使用标志 canExecute
  2. 如果允许执行线程,则使用方法 calculateState 计算当前状态并将标志设置为 true。
  3. 将所有等待的线程存储在列表或类似的东西中

所以你的任务看起来像这样:

Task
  boolean canExeute

方法 waitForThreadsInGroup 然后看起来像这样:

waitForThreadsInGroup
  monitor.lock();
      add task to list
      calculateTaskState
      condition.notifyAll
      while( ! task.canExcecute )
      {
        condition.await.
      }

  monitor.unlock();

完成方法看起来类似:

  finish
    monitor.lock();
    decrement finish count
    calculateTaskState
   condition.notifyAll
   monitor.unlock();

并计算TaskState

calculateTaskState
  if( finishCount == 0)
  {
      if( taskList.size >= 4  )
      {
         set 4 tasks in this list to can execute and remove them from the list
      }
  }

所以诀窍是将逻辑分为三个步骤:

  1. 动作,例如减少完成计数
  2. 新状态的计算。并决定是否允许执行每个线程
  3. 和线程的等待。每个线程都需要等待自己的标志
于 2020-04-08T17:26:34.523 回答