0

我有 2 个线程,一个用于传输,一个用于重播我只想在 RXThread 上收到一条消息时发送一条消息。我使用了 wait() 和 notify(),为了防止在 wait() 之前出现通知,我这样做了,但它仅在调试中运行时才有效,尽管 RX 线程不发送消息。

private boolean stopped = false;


class StubTxtask implements Runnable {

    public void run() {
        try {

            // Sends all messages in sequence
            for (int i=0; i<txMsgSeq.getMessagesCount(); i++) {
            }
                if (syncRxTx) {
                    synchronized (syncObj) {
                        while(!stopped) {
                            syncObj.wait();
                    }
                    }

                }
                System.out.println("************ "+ i + "/" + txMsgSeq.getMessagesCount());
                pcs.sendMsg((GeneratedMessage)txMsgSeq.getMessage(i));
                if (!syncRxTx) {
                    Thread.sleep(1000);
                }
            }

        } catch (Exception e) {

        }
    }
}


class StubRxtask implements Runnable {

    public void run() {
        while (true) {
            try {
                // Wait for a message() 
                TncMessage msg = (TncMessage) pcs.waitMsg(connInt);
                System.out.println(msg.toString());
                // Add the message to the RX Sequence
                rxMsgSeq.addMessage(msg);

                System.out.println(rxMsgSeq.getMessagesCount());

                if (syncRxTx) {
                    TncHeader header;
                    Method invokeGetHeader;
                    try {
                        invokeGetHeader = msg.getClass().getMethod("getHeader", null);
                        header = (TncHeader) invokeGetHeader.invoke(msg, null);
                        if (header.getType() != EnumMessageType.ACK) {
                            synchronized (syncObj) {
                                stopped = true;
                                syncObj.notify();
                            }

                        }


                    } catch (Exception e) {
                        System.err.println("ERROR - Impossible to find or invoke getHeader() method on msg");
                    }


                }
                stopped = false;

            } catch (Exception e) {

            }
        }
    }
}
4

3 回答 3

0

如果它只能在调试中正常工作,这通常是由于时序问题而发生的,这在调试模式下是不同的。

据我了解您的代码,使用Phaser将满足您的要求。移相器就像一个屏障:它导致所有线程等待,直到所有线程都在等待它,除了它是可重用的。由于现在两个线程都在等待另一个线程到达,因此您不再需要stopped构建或同步(所有都由 Phaser 在内部处理)。

代替

if (syncRxTx) {
  synchronized (syncObj) {
    while(!stopped) {
      syncObj.wait();
    }
  }
}

if (syncRxTx) {
  phaser.arriveAndAwaitAdvance();
}

if (header.getType() != EnumMessageType.ACK) {
  synchronized (syncObj) {
    stopped = true;
    syncObj.notify();
  }
}

if (header.getType() != EnumMessageType.ACK) {
  phaser.arriveAndAwaitAdvance();
}

注意:与同步对象一样,Phaser 应声明为 final。

于 2013-11-08T11:43:08.523 回答
0

改用 Semaphore 进行同步,因为这些锁处理“提前通知”。

Semaphore sem = new Semaphore(0);

runConsume() {
  .. sem.aquire(1); // will block if nothing avaiable
}

runProduce() {

  // receivedMessage
  .. sem.release(1);
}
于 2013-11-08T14:03:54.030 回答
0

如果您while在另一个线程的循环中使用变量并想要获取新数据,则必须使用volatile(检查visibility规则)声明它

于 2013-11-08T14:06:54.820 回答