1

我在 fs2 中使用 pub-sub 模式。我在处理消息流时动态创建主题和订阅者。出于某种原因,我的订阅者只收到初始消息,但进一步发布的消息永远不会到达订阅者

def startPublisher2[In](inputStream: Stream[F, Event]): Stream[F, Unit] = {
    inputStream.through(processingPipe)
  }

  val processingPipe: Pipe[F, Event, Unit] = { inputStream =>
    inputStream.flatMap {
      case message: Message[_] => initSubscriber(message)
        .flatMap { topic => Stream.eval(topic.publish1(message)) }
    }
  }

  def initSubscriber[In](message: Message[In]): Stream[F,Topic[F, Event]] = {

    Option(sessions.get(message.sessionId)) match {
      case None =>
        println(s"=== Create new topic for sessionId=${message.sessionId}")

        val topic = Topic[F, Event](message)
        sessions.put(message.sessionId, topic)
         Stream.eval(topic) flatMap  {t =>
           //TODO: Is there a better solution?
           Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)
         }

      case Some(topic) =>
        println(s"=== Existing topic for sessionId=${message.sessionId}")
        Stream.eval(topic)
    }
  }

订阅者代码很简单:

def startSubscribers2(topic: Topic[F, Event]): Stream[F, Unit] = {
    def processEvent(): Pipe[F, Event, Unit] =
      _.flatMap {
        case e@Text(_) =>
          Stream.eval(F.delay(println(s"Subscriber processing event: $e")))
        case Message(content, sessionId) =>
          //Thread.sleep(2000)
          Stream.eval(F.delay(println(s"Subscriber #$sessionId got message: ${content}")))
        case Quit =>
          println("Quit")
          Stream.eval(interrupter.set(true))
      }

    topic.subscribe(10).through(processEvent())
  }

输出如下:

=== Create new topic for sessionId=11111111-1111-1111-1111-111111111111
Subscriber #11111111-1111-1111-1111-111111111111 got message: 1
=== Existing topic for sessionId=11111111-1111-1111-1111-111111111111
=== Create new topic for sessionId=22222222-2222-2222-2222-222222222222
Subscriber #22222222-2222-2222-2222-222222222222 got message: 1
=== Create new topic for sessionId=33333333-3333-3333-3333-333333333333
Subscriber #33333333-3333-3333-3333-333333333333 got message: 1
=== Existing topic for sessionId=22222222-2222-2222-2222-222222222222
=== Existing topic for sessionId=22222222-2222-2222-2222-222222222222

我没有看到发布到现有主题的消息。另外,我想知道是否有更好的方法来启动订阅者的异步流,而不是Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)

4

0 回答 0