2

我是 scala 的新手,对 scalaz 非常陌生。通过不同的 stackoverflow 答案和一些手把手,我能够使用 scalaz.stream 来实现一个可以持续获取 twitter API 结果的 Process。现在我想对存储 twitter 句柄的 Cassandra DB 做同样的事情。

获取推特结果的代码在这里:

def urls: Seq[(Handle,URL)] = {
 Await.result(

   getAll(connection).map { List =>
      List.map(twitterToGet =>
   (twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
   )
  },
    5 seconds)

}

val fetchUrl = channel.lift[Task, (Handle, URL), Fetched] {
  url => Task.delay {

    val finalResult = callTwitter(url)
    if (finalResult.tweets.nonEmpty) {
      connection.updateTwitter(finalResult)
    } else {
      println("\n" + finalResult.handle + " does not have new tweets")
    }
    s"\ntwitter Fetch & database update completed"

  }
}

val P = Process
val process =
  (time.awakeEvery(3.second) zipWith P.emitAll(urls))((b, url) => url).
    through(fetchUrl)

val fetched = process.runLog.run
fetched.foreach(println)

我打算做的是使用

def urls: Seq[(Handle,URL)] = {

持续获取 Cassandra 结果(使用 awakeEvery)并将它们发送给演员以运行上述 twitter 获取代码。

我的问题是,用 scalaz.stream 实现这一点的最佳方法是什么?请注意,我希望它获得所有数据库结果,然后在再次获得所有数据库结果之前有一个延迟。我应该使用与上面的 twitter 获取代码相同的架构吗?如果是这样,我将如何创建一个不需要输入的 channel.lift?scalaz.stream 中是否有更好的方法?

提前致谢

4

1 回答 1

1

今天得到这个工作。最简洁的方法是将数据库结果作为流发出,并将接收器附加到流的末尾以进行 twitter 处理。实际上我所拥有的要复杂一些,因为它会不断地检索数据库结果并将它们发送给参与者进行 twitter 处理。检索结果的风格遵循我的问题中的原始代码:

val connection = new simpleClient(conf.getString("cassandra.node"))

implicit val threadPool = new ScheduledThreadPoolExecutor(4)
val system = ActorSystem("mySystem")
val twitterFetch = system.actorOf(Props[TwitterFetch], "twitterFetch")

  def myEffect = channel.lift[Task, simpleClient, String]{
    connection: simpleClient => Task.delay{

      val results = Await.result(
        getAll(connection).map { List =>
          List.map(twitterToGet =>
            (twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
          )
        },
        5 seconds)

      println("Query Successful, results= " +results +" at " + format.print(System.currentTimeMillis()))

      twitterFetch ! fetched(connection, results)
      s"database fetch completed"
    }
  }

  val P = Process
  val process =
    (time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
      through(myEffect)))

  val fetching = process.runLog.run
  fetching.foreach(println)

一些注意事项:

我曾询问过在没有输入的情况下使用 channel.lift,但很明显输入应该是 cassandra 连接。

线

val process =
(time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
  through(myEffect)))

从 zipWith 更改为 flatMap 因为我想连续检索结果而不是一次。

于 2015-06-09T18:09:40.870 回答