0

我正在使用AKKA HttpMonixDatastax Java 驱动程序为 Apache Cassandra构建一个 REST API ,我在尝试从 cassandra 获取一些项目时遇到了一些麻烦,等待查询完成并返回结果。

我能够轻松打印所有结果,但无法等待查询完成并返回所有项目。我的休息点只是返回一个空的项目数组,因为它不等待查询完成。

我有一个executeQuery方法:

  • queryString: String表示 cassandra 查询
  • page: Int对分页有用
  • parameters: Any*表示参数,如果查询需要

并返回一个Observable[Row].

然后,为了执行这样的查询,检索它的结果,解析它们并将它们发回,我使用Monix Observable 和 Subscription

假设我想通过一个名为的公共字段检索一些项目pid

import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global
import com.datastax.driver.core.Row
import monix.reactive.Observable

import cassandra.src.CassandraHelper
import item.src.entity.{Item, Items}

. . .
val keyspace = "my_keyspace"
val table = "items"
. . .

def getItems() : Items = {
  var itemList: Items = List()
  val observable: Observable[Row] = CassandraHelper.executeQuery(
    "SELECT * FROM " + keyspace + "." + table,
    1
  )
  observable.subscribe { row =>
    itemList ::= ItemMapper.rowToItem()(row)
    Ack.Continue
  }
  Items(itemList)
}

WhererowToItem只是将一行解析为Itemand Items: List[Item]。我正在查看Task,但我不太确定它在寻找什么。

编辑

使用@Alexandru Nedelcu 解决方案,我可以itemsitemList将它们插入其中后立即打印所有内容,但该调用仍然得到空响应:{ "items" : [] }.

这是编辑后的代码:

def getItems() : Items = {
  var itemList: List[Item] = List()
  val observable: Observable[Row] = CassandraHelper.executeQuery(
    "SELECT * FROM " + keyspace + "." + table,
    1
  )
  observable.subscribe { row =>
    println(itemList)
    itemList ::= ItemMapper.rowToItem()(row)
    Ack.Continue
  }
  Items(itemList)
}

如何等待结果全部解析并插入到项目中,然后将它们发回?

4

1 回答 1

1

据我了解,您有一个Observable[Row]并且您想从中构建一个Items,它聚合Row源流中的每个元素,对吗?

如果是这样,这foldLeftL就是您想要的,它将把每个元素聚合成一个状态,并在源流完成后返回最终结果:

// We need to suspend the Task, because your Items is probably a
// mutable object and it's best to suspend side effects ;-)
val items: Task[Items] = Task.suspend {
  val initial: Items = _
  val observable: Observable[Row] = ???

  // This returns a Task[Items] when the source completes
  observable.foldLeftL(initial) { (items, elem) =>
    items ::= ItemMapper.rowToItem()(row)
    // I don't understand if your `Items` is mutable or not
    // but returning the same reference is fine
    items
  }
}

ATask是一个懒惰的人Future。您可以将其转换为Futurewith runAsync。更多细节在这里:https ://monix.io/docs/2x/eval/task.html

于 2017-08-26T11:19:05.120 回答