1

我正在尝试使用 akka-streams 的客户端向数据库发送查询Tcp,但我不明白我缺少什么。

所以我有两种类型QueryResponse它们可以完美地与akka's 相互转换ByteString。所以我正在创建一个与 的客户端连接val conn = Tcp().outgoingConnection("localhost", 28015),这给了我一个Flow[ByteString, ByteString, Future[OutgoingConnection]],到目前为止一切都很好。所以我假设源是我对查询的请求,我找不到用查询源提供这个流的最佳方法,而是像构建它一样Source(Future.successful(query)),并将它连接到流source.via(flow),这给了我另一个Source[Response, Unit]。在这里我无法理解如何获取Future[Response],尝试了几个组合器,但它给了我Materialized价值,我不完全理解它与流程中的值/类型的关系。

4

2 回答 2

1

首先:它是什么类型的数据库,为什么要直接通过 TCP 访问它?你确定这会像你缩进它那样工作吗?您是否能够处理传入响应的框架?

至于你关于Future[Response]退出 a的问题Source[Response, Unit],它就像用 a 运行 Source 一样简单Sink.head,即像这样:(当然val res: Future[Result] = source.runWith(Sink.head)你需要一个implicit val mat = ActorMaterializer()in 范围)。

我强烈建议您在深入使用 Streams 之前花一些时间阅读 Akka Streams 文档。

于 2015-09-16T23:27:58.243 回答
0

您可以在Flowjoin上使用该方法。从文档中:

通过交叉连接输入和输出,将此流加入另一个流,创建一个 RunnableGraph。

+------+        +-------+
|      | ~Out~> |       |
| this |        | other |
|      | <~In~  |       |
+------+        +-------+

This allows you to connect the output from the connection to your Flow's input, and also connects your Flow's output to the connections input.

Specifically, you can take the Flow generated from the outgoingConnection and join it with a Flow you created to respond to queries:

def queryDB(query : ByteString) : Future[ByteString] = ???

val concurrentQueries = 10

val queryResponder = 
  Flow[ByteString].mapAsync(concurrentQueries)(queryDB)

val server : String = ???
val port : Int = ???

//from the diagram above:
//this = connection
//other = queryResponder
Tcp().outgoingConnection(server, port).join(queryResponder).run()
于 2015-10-28T15:55:58.017 回答