0

我有两个流,我希望能够根据我每秒运行的计算只消耗一个x

我想我基本上需要创建第三个tick流 - 类似于every(3.seconds)- 进行计算,然后在其他两个之间进行某种切换。

我有点卡在这里(而且我才刚刚开始玩弄 scalaz-stream)。

谢谢!

4

1 回答 1

0

有几种方法可以解决这个问题。处理它的一种方法是使用awakeEvery. 具体例子见这里

为了简要描述该示例,假设我们希望每 5 秒查询一次 twitter 并获取推文并执行情绪分析。我们可以按如下方式组成这个管道:

val source = 
    awakeEvery(5 seconds) |> buildTwitterQuery(query) through queryChannel flatMap {
        Process emitAll _ 
    }

请注意,queryChannel可以如下表述。

def statusTask(query: Query): Task[List[Status]] = Task {
      twitterClient.search(query).getTweets.toList
}

val queryChannel: Channel[Task, Query, List[Status]] = channel lift statusTask

如果您有任何问题,请告诉我。如前所述,有关完整示例,请参阅this

我希望它有帮助!

于 2016-05-18T10:37:50.167 回答