我正在尝试将 ZMQ 订阅者嵌入到 Runnable 中。我能够第一次启动 Runnable,一切似乎都很好。问题是当我中断线程并尝试启动一个新线程时,订阅者没有收到任何消息。例如:
我有一个可运行的发布者
class ZMQPublisherRunnable() extends Runnable { override def run() { val ZMQcontext = ZMQ.context(1) val publisher = ZMQcontext.socket(ZMQ.PUB) var count = 0 publisher.connect(s"tcp://127.0.0.1:16666") while (!Thread.currentThread().isInterrupted) { try { println(s"PUBLISHER -> $count") publisher.send(s"PUBLISHER -> $count") count += 1 Thread.sleep(1000) } catch { case e: Exception => println(e.getMessage) publisher.disconnect(s"tcp://127.0.0.1:16666") ZMQcontext.close() } } } }
我有一个订阅者可运行:
class ZMQSubscriberRunnable1() extends Runnable { override def run() { println("STARTING SUBSCRIBER") val ZMQcontext = ZMQ.context(1) val subscriber = ZMQcontext.socket(ZMQ.SUB) subscriber.subscribe("".getBytes) subscriber.bind(s"tcp://127.0.0.1:16666") while (!Thread.currentThread().isInterrupted) { try { println("waiting") val mesg = new String(subscriber.recv(0)) println(s"SUBSCRIBER -> $mesg") } catch { case e: Exception => println(e.getMessage) subscriber.unbind("tcp://127.0.0.1:16666") subscriber.close() ZMQcontext.close() } } } }
我的主要代码如下所示:
object Application extends App { val zmqPUB = new ZMQPublisherRunnable val zmqThreadPUB = new Thread(zmqPUB, "MY_PUB") zmqThreadPUB.setDaemon(true) zmqThreadPUB.start() val zmqRunnable = new ZMQSubscriberRunnable1 val zmqThread = new Thread(zmqRunnable, "MY_TEST") zmqThread.setDaemon(true) zmqThread.start() Thread.sleep(10000) zmqThread.interrupt() zmqThread.join() Thread.sleep(2000) val zmqRunnable_2 = new ZMQSubscriberRunnable1 val zmqThread_2 = new Thread(zmqRunnable_2, "MY_TEST_2") zmqThread_2.setDaemon(true) zmqThread_2.start() Thread.sleep(10000) zmqThread_2.interrupt() zmqThread_2.join() }
第一次启动订阅者时,我能够接收所有消息:
STARTING SUBSCRIBER
PUBLISHER -> 0
waiting
PUBLISHER -> 1
SUBSCRIBER -> PUBLISHER -> 1
waiting
PUBLISHER -> 2
SUBSCRIBER -> PUBLISHER -> 2
waiting
PUBLISHER -> 3
SUBSCRIBER -> PUBLISHER -> 3
waiting
...
一旦我中断线程并从同一个 Runnable 启动一个新线程,我就无法再阅读消息了。它永远在等待
STARTING SUBSCRIBER
waiting
PUBLISHER -> 13
PUBLISHER -> 14
PUBLISHER -> 15
PUBLISHER -> 16
PUBLISHER -> 17
...
关于我做错了什么的任何见解?
谢谢