1

我是 ZeroMQ 的新手,并且似乎在我的begin()方法中循环丢失了消息。

我想知道我是否错过了一个我没有排队消息或什么的东西?

当我在我的发布者上引发一个事件时,它会向我的订阅者发送两条消息,中间有一个小间隔,我似乎没有收到第二条被中继的消息。我错过了什么?

class ZMQSubscriber[T <: Transaction, B <: Block](
  socket: InetSocketAddress,
  hashTxListener: Option[HashDigest => Future[Unit]],
  hashBlockListener: Option[HashDigest => Future[Unit]],
  rawTxListener: Option[Transaction => Future[Unit]],
  rawBlockListener: Option[Block => Future[Unit]]) {
  private val logger = BitcoinSLogger.logger

  def begin()(implicit ec: ExecutionContext) = {
    val context = ZMQ.context(1)

    //  First, connect our subscriber socket
    val subscriber = context.socket(ZMQ.SUB)
    val uri = socket.getHostString + ":" + socket.getPort

    //subscribe to the appropriate feed
    hashTxListener.map { _ =>
      subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
      logger.debug("subscribed to the transaction hashes from zmq")
    }

    rawTxListener.map { _ =>
      subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
      logger.debug("subscribed to raw transactions from zmq")
    }

    hashBlockListener.map { _ =>
      subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
      logger.debug("subscribed to the hashblock stream from zmq")
    }

    rawBlockListener.map { _ =>
      subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
      logger.debug("subscribed to raw block")
    }

    subscriber.connect(uri)
    subscriber.setRcvHWM(0)
    logger.info("Connection to zmq client successful")

    while (true) {
      val notificationTypeStr = subscriber.recvStr(ZMQ.DONTWAIT)
      val body = subscriber.recv(ZMQ.DONTWAIT)
      Future(processMsg(notificationTypeStr, body))
    }
  }

  private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = Future {

    val notification = ZMQNotification.fromString(topic)
    val res: Option[Future[Unit]] = notification.flatMap {
      case HashTx =>
        hashTxListener.map { f =>
          val hash = Future(DoubleSha256Digest.fromBytes(body))
          hash.flatMap(f(_))
        }
      case RawTx =>
        rawTxListener.map { f =>
          val tx = Future(Transaction.fromBytes(body))
          tx.flatMap(f(_))
        }
      case HashBlock =>
        hashBlockListener.map { f =>
          val hash = Future(DoubleSha256Digest.fromBytes(body))
          hash.flatMap(f(_))
        }
      case RawBlock =>
        rawBlockListener.map { f =>
          val block = Future(Block.fromBytes(body))
          block.flatMap(f(_))
        }
    }
  }
}
4

2 回答 2

0

所以这似乎已经通过ZMsg.recvMsg()while-loop 中使用 a 而不是

  val notificationTypeStr = subscriber.recvStr(ZMQ.DONTWAIT)
  val body = subscriber.recv(ZMQ.DONTWAIT)

我不确定为什么会这样,但确实如此。所以这就是我的begin方法现在的样子

    while (run) {
      val zmsg = ZMsg.recvMsg(subscriber)
      val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
      val body = zmsg.pop().getData
      Future(processMsg(notificationTypeStr, body))
    }
    Future.successful(Unit)
  }
于 2018-04-23T00:42:50.273 回答
0

我错过了什么?

阻塞v / s非阻塞作案手法如何工作:

诀窍在于对方法的相应调用的(非)阻塞模式.recv()

因此,对 -method 的第二次调用会subscriber.recv( ZMQ.DONTWAIT )立即返回,因此您的第二部分 ( the body) 可能并且将在法律上不包含任何内容,即使您的承诺声明确实从发布者端发送了一对消息(一对.send()方法调用 -也有人可能会反对,发件人实际上可能只是以多部分方式发送了一条消息——MCVE 代码在这部分并不具体)。

因此,一旦您将代码从非阻塞模式(在 O/P 中)移动到主要阻塞模式(锁定/同步代码的进一步流与任何合理格式到达的外部事件消息,不早于返回),在:

val zmsg = ZMsg.recvMsg(subscriber) // which BLOCKS-till-a-1st-zmsg-arrived

两个进一步处理的.pop()部分都只是卸载组件(参考上面ZMsg实际发送的发布方实际发送的实际多部分结构的注释)


安全下一个:
unlimited alloc-s v / s强制阻止/丢弃消息?

该代码在几个方面让我感到惊讶。除了对.connect()-method 的“延迟”调用之外,与所有先前的 socket-archetype 详细设置相比(通常在请求建立连接之后“安排”)。虽然这可能按预期工作得很好,但它为.Context()-instance 提供了更紧(更小)的时间窗口来设置和(重新)协商所有相关的连接细节以成为 RTO。

有一条特别的线引起了我的注意:subscriber.setRcvHWM( 0 )这是一个依赖于版本原型的技巧。然而,零值会导致应用程序变得脆弱,我不建议在任何生产级应用程序中这样做。

于 2018-04-23T11:09:02.690 回答