1

过去几天我一直面临一个关于在 Scala 中保存和处理 Futures 数据的问题。我对语言和两者的概念都很陌生。Lagom 在 Cassandra 上的文档说要实现大约 9 个代码文件,我想确保我的数据库代码在将其传播到那么多代码之前可以正常工作。

具体来说,我目前正在尝试实现一个概念证明,以向/从 lagom 为您实现的 cassandra 数据库发送数据。到目前为止,我能够向/从数据库发送和检索数据,但是我无法返回该数据,因为这一切都是异步运行的,并且还返回了成功返回的数据。

我已经玩了一段时间了;检索代码如下所示:

override def getBucket(logicalBucket: String) = ServiceCall[NotUsed, String] {
request => {
  val returnList = ListBuffer[String]()

  println("Retrieving Bucket " + logicalBucket)
  val readingFromTable = "SELECT * FROM data_access_service_impl.s3buckets;"

  //DB query
  var rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
  println(rowsFuture)

  Await.result(rowsFuture, 10 seconds)

  rowsFuture onSuccess {
    case rows => {
      println(rows)
      for (row <- rows) println(row.getString("name"))
      for (row <- rows) returnList += row.getString("name")
      println("ReturnList: " + returnList.mkString)
    }
  }

  rowsFuture onFailure {
    case e => println("An error has occured: " + e.getMessage)
    Future {"An error has occured: " + e.getMessage}
  } 

  Future.successful("ReturnList: " + returnList.mkString)
 }      
}

当它运行时,我在 onSuccess 回调中将预期的数据库值设置为“println”。但是,我在 return 语句中使用的同一个变量在回调之外打印为空(并且也返回空数据)。这也发生在我使用的“插入”函数中,它并不总是返回我在回调函数中设置的变量。

如果我尝试将语句放在回调函数中,则会收到“返回单元,期望未来 [字符串]”的错误。所以我被困在无法从回调函数中返回的地方,所以我不能保证我正在返回数据)。

我的目标是向 API 返回一个字符串,以便它显示数据库中所有 s3 存储桶名称的列表。这意味着遍历 Future[Seq[Row]] 数据类型,并将数据保存到连接字符串中。如果有人可以提供帮助,他们将解决我通过 Lagom、Akka、Datastax 和 Cassandra 文档阅读的 2 周的问题。在这一点上我大吃一惊(信息超载),我没有找到明确的指南。

作为参考,这里是 cassandraSession 文档:

LagomTutorial/Documentation Style Information 及其唯一的 cassandra-query 示例 CassandraSession.scala 代码

4

2 回答 2

7

了解Future, (and Option, and Either, and Try) 的关键是,您不会(通常)它们中获取值,而是将计算带入它们。最常见的方法是使用mapandflatMap方法。

在您的情况下,您想要将 aSeq[Row]转换为 a String。然而,你Seq[Row]被包裹在这个名为的不透明数据结构Future中,所以你不能rows.mkString像你实际上有一个Seq[Row]. 因此,与其获取值并对其执行计算,不如将计算rows.mkString带到数据中:

//DB query
val rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
val rowToString = (row: Row) => row.getString("name")
val computation = (rows: Seq[Row]) => rows.map(rowToString).mkString

// Computation to the data, rather than the other way around
val resultFuture = rowsFuture.map(computation)

现在,当rowsFuture完成时,您通过调用创建的新未来将通过调用真正关心结果rowsFuture.map来实现。computationSeq[Row]

那时您可以return resultFuture正常工作,并且一切都会按预期工作,因为调用的代码需要getBucketaFuture并将适当地处理它。

为什么Future不透明?

原因很简单,因为它代表了一个当前可能不存在的值。您只能在值存在时获取值,但是当您开始调用时,它不存在。isComplete代码允许您注册计算(回调,如onSuccessand )或使用andonFailure创建新的派生未来值,而不是让您自己轮询某个字段。mapflatMap

更深层次的原因是因为Future是 Monad 并且 monads 包含计算,但没有操作从它们中提取计算

于 2017-04-25T01:09:58.653 回答
0

将 select 替换为您的特定 select 和您要为特定字段获取的字段。示例仅用于测试,不是架构建议。

package ldg.com.dbmodule.model

/**
* Created by ldipotet on 05/11/17.
*/

import com.datastax.driver.core.{Cluster, ResultSet, ResultSetFuture}
import scala.util.{Failure, Success, Try}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._

//Use Implicit Global Context is strongly discouraged! we must create    
//our OWN execution CONTEXT !
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, _}
import scala.concurrent.duration._


object CassandraDataStaxClient {

//We create here a CallBack in Scala with the DataStax api

implicit def resultSetFutureToScala(rf: ResultSetFuture):    
Future[ResultSet] = {

val promiseResult = Promise[ResultSet]()
val producer = Future {

    getResultSet(rf) match {

       //we write a promise with an specific value   
       case Success(resultset) => promiseResult success resultset
       case Failure(e) => promiseResult failure (new 
                       IllegalStateException)

    }
  }
  promiseResult.future
}

def getResultSet: ResultSetFuture => Try[ResultSet] = rsetFuture => {
    Try(
       // Other choice can be:
       // getUninterruptibly(long timeout, TimeUnit unit) throws 
          TimeoutException
       // for an specific time

       //can deal an IOException
       rsetFuture.getUninterruptibly
   )
 }

 def main(args: Array[String]) {

    def defaultFutureUnit() = TimeUnit.SECONDS
    val time = 20 seconds
    //Better use addContactPoints and adds more tha one contact point

   val cluster = Cluster.builder().addContactPoint("127.0.0.1").build()
   val session = cluster.connect("myOwnKeySpace")

   //session.executeAsync es asyncronous so we'll have here a 
   //ResultSetFuture
  //converted to a resulset due to Implicitconversion
  val future: Future[ResultSet] = session.executeAsync("SELECT * FROM 
  myOwnTable")

  //blocking on a future is strongly discouraged!! next is an specific     
  //case
  //to make sure that all of the futures have been completed
  val results = Await.result(future,time).all()
  results.foreach(row=>println(row.getString("any_String_Field"))
  session.close()
  cluster.close()
  }
 }
于 2017-05-10T23:47:50.213 回答