我在 fs2 中使用 pub-sub 模式。我在处理消息流时动态创建主题和订阅者。出于某种原因,我的订阅者只收到初始消息,但进一步发布的消息永远不会到达订阅者
def startPublisher2[In](inputStream: Stream[F, Event]): Stream[F, Unit] = {
inputStream.through(processingPipe)
}
val processingPipe: Pipe[F, Event, Unit] = { inputStream =>
inputStream.flatMap {
case message: Message[_] => initSubscriber(message)
.flatMap { topic => Stream.eval(topic.publish1(message)) }
}
}
def initSubscriber[In](message: Message[In]): Stream[F,Topic[F, Event]] = {
Option(sessions.get(message.sessionId)) match {
case None =>
println(s"=== Create new topic for sessionId=${message.sessionId}")
val topic = Topic[F, Event](message)
sessions.put(message.sessionId, topic)
Stream.eval(topic) flatMap {t =>
//TODO: Is there a better solution?
Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)
}
case Some(topic) =>
println(s"=== Existing topic for sessionId=${message.sessionId}")
Stream.eval(topic)
}
}
订阅者代码很简单:
def startSubscribers2(topic: Topic[F, Event]): Stream[F, Unit] = {
def processEvent(): Pipe[F, Event, Unit] =
_.flatMap {
case e@Text(_) =>
Stream.eval(F.delay(println(s"Subscriber processing event: $e")))
case Message(content, sessionId) =>
//Thread.sleep(2000)
Stream.eval(F.delay(println(s"Subscriber #$sessionId got message: ${content}")))
case Quit =>
println("Quit")
Stream.eval(interrupter.set(true))
}
topic.subscribe(10).through(processEvent())
}
输出如下:
=== Create new topic for sessionId=11111111-1111-1111-1111-111111111111
Subscriber #11111111-1111-1111-1111-111111111111 got message: 1
=== Existing topic for sessionId=11111111-1111-1111-1111-111111111111
=== Create new topic for sessionId=22222222-2222-2222-2222-222222222222
Subscriber #22222222-2222-2222-2222-222222222222 got message: 1
=== Create new topic for sessionId=33333333-3333-3333-3333-333333333333
Subscriber #33333333-3333-3333-3333-333333333333 got message: 1
=== Existing topic for sessionId=22222222-2222-2222-2222-222222222222
=== Existing topic for sessionId=22222222-2222-2222-2222-222222222222
我没有看到发布到现有主题的消息。另外,我想知道是否有更好的方法来启动订阅者的异步流,而不是Stream.empty.interruptWhen(interrupter) concurrently startSubscribers2(t)