5

I want to implement an client app that first send an request to server then wait for its reply(similar to http)

My client process may be

 val topic = async.topic[ByteVector]
 val client = topic.subscribe

Here is the api

trait Client {
  val incoming = tcp.connect(...)(client)
   val reqBus = topic.pubsh()
   def ask(req: ByteVector): Task[Throwable \/ ByteVector] =  {
      (tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
      ???
   }
}

Then, how to implement the remain part of ask ?

4

1 回答 1

6

通常,实现是通过接收器发布消息,然后等待某个源上的某种回复,例如您的主题。

实际上,我们的代码中有很多这样的习惯用法:

def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
 merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}

本质上,这第一个挂钩回复流以等待任何结果O确认我们发送的请求。然后我们发布消息I并咨询pf任何传入O的最终翻译O2,然后终止。

于 2015-01-19T09:03:11.633 回答