我正在尝试使用 akka-streams 的客户端向数据库发送查询Tcp
,但我不明白我缺少什么。
所以我有两种类型Query
,Response
它们可以完美地与akka's 相互转换ByteString
。所以我正在创建一个与 的客户端连接val conn = Tcp().outgoingConnection("localhost", 28015)
,这给了我一个Flow[ByteString, ByteString, Future[OutgoingConnection]]
,到目前为止一切都很好。所以我假设源是我对查询的请求,我找不到用查询源提供这个流的最佳方法,而是像构建它一样Source(Future.successful(query))
,并将它连接到流source.via(flow)
,这给了我另一个Source[Response, Unit]
。在这里我无法理解如何获取Future[Response]
,尝试了几个组合器,但它给了我Materialized
价值,我不完全理解它与流程中的值/类型的关系。