1

我正在玩一些 NATS 流媒体,但订阅者速率限制有问题。当我将飞行中的最大值设置为 1 并将超时设置为 1 秒并且我有一个基本上是 Thread.sleep(1000) 的消费者时,我会多次获得相同的事件。我认为通过限制飞行和使用手动确认这不应该发生。我怎样才能在非常慢的消费者身上得到非常好的一次交付?

  case class EventBus[I, O](inputTopic: String, outputTopic: String, connection: Connection, eventProcessor: StatefulEventProcessor[I, O]) {
    // the event bus could be some abstract class while the `Connection` coulbd be injected using DI
    val substritionOptions: SubscriptionOptions = new SubscriptionOptions.Builder()
                                                                         .setManualAcks(true)
                                                                         .setDurableName("foo")
                                                                         .setMaxInFlight(1)
                                                                         .setAckWait(1, TimeUnit.SECONDS)
                                                                         .build()

    if (!inputTopic.isEmpty) {
      connection.subscribe(inputTopic, new MessageHandler() {
        override def onMessage(m: Message) {
          m.ack()
          try {
            val event = eventProcessor.deserialize(m.getData)
            eventProcessor.onEvent(event)
          } catch {
            case any =>
              try {
                val command = new String(m.getData)
                eventProcessor.onCommand(command)
              } catch {
                case any => println(s"de-serialization error: $any")
              }
          } finally {
            println("got event")
          }
        }
      }, substritionOptions)
    }

    if (!outputTopic.isEmpty) {
      eventProcessor.setBus(e => {
        try {
          connection.publish(outputTopic, eventProcessor.serialize(e))
        } catch {
          case ex => println(s"serialization error $ex")
        }
      })
    }
  }


  abstract class StatefulEventProcessor[I, O] {
    private var bus: Option[O => Unit] = None
    def onEvent(event: I): Unit
    def onCommand(command: String): Unit

    def serialize(o: O): Array[Byte] =
      SerializationUtils.serialize(o.asInstanceOf[java.io.Serializable])

    def deserialize(in: Array[Byte]): I =
      SerializationUtils.deserialize[I](in)

    def setBus(push: O => Unit): Unit = {
      if (bus.isDefined) {
        throw new IllegalStateException("bus already set")
      } else {
        bus = Some(push)
      }
    }

    def push(event: O) =
      bus.get.apply(event)
  }


  EventBus("out-1", "out-2", sc, new StatefulEventProcessor[String, String] {
    override def onEvent(event: String): Unit = {
      Thread.sleep(1000)
      push("!!!" + event)
    }

    override def onCommand(command: String): Unit = {}
  })

  (0 until 100).foreach(i => sc.publish("out-1", SerializationUtils.serialize(s"test-$i")))
4

1 回答 1

3

首先,NATS Streaming 没有完全一次(重新)交付的保证。MaxInflight 为您提供的保证是,在未确认的消息数量低于该数量之前,服务器不会向订阅者发送新消息。因此,在 MaxInflight(1) 的情况下,您要求服务器仅在收到来自先前传递的消息的确认后才发送下一条新消息。但是,这不会阻止重新传递未确认的消息。

服务器不保证或不知道消息实际上已被订阅者接收。这就是 ACK 的用途,让服务器知道消息已被订阅者正确处理。如果服务器不支持重新交付(即使达到 MaxInflight 时),那么“丢失”消息将永远停止您的订阅。请记住,NATS 流服务器和客户端之间没有直接通过 TCP 连接相互连接(它们都连接到 NATS 服务器,即 gnatsd)。

于 2017-07-28T21:10:32.203 回答