我正在使用AKKA Http、Monix和Datastax Java 驱动程序为 Apache Cassandra构建一个 REST API ,我在尝试从 cassandra 获取一些项目时遇到了一些麻烦,等待查询完成并返回结果。
我能够轻松打印所有结果,但无法等待查询完成并返回所有项目。我的休息点只是返回一个空的项目数组,因为它不等待查询完成。
我有一个executeQuery
方法:
queryString: String
表示 cassandra 查询page: Int
对分页有用parameters: Any*
表示参数,如果查询需要
并返回一个Observable[Row]
.
然后,为了执行这样的查询,检索它的结果,解析它们并将它们发回,我使用Monix Observable 和 Subscription。
假设我想通过一个名为的公共字段检索一些项目pid
:
import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable
import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}
. . .
val keyspace = "my_keyspace"
val table = "items"
. . .
def getItems() : Items = {
var itemList: Items = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
WhererowToItem
只是将一行解析为Item
and Items: List[Item]
。我正在查看Task,但我不太确定它在寻找什么。
编辑
使用@Alexandru Nedelcu 解决方案,我可以items
在itemList
将它们插入其中后立即打印所有内容,但该调用仍然得到空响应:{ "items" : [] }
.
这是编辑后的代码:
def getItems() : Items = {
var itemList: List[Item] = List()
val observable: Observable[Row] = CassandraHelper.executeQuery(
"SELECT * FROM " + keyspace + "." + table,
1
)
observable.subscribe { row =>
println(itemList)
itemList ::= ItemMapper.rowToItem()(row)
Ack.Continue
}
Items(itemList)
}
如何等待结果全部解析并插入到项目中,然后将它们发回?