我基于这样的 ReactiveStreams Publisher 为 Akka Stream 创建了一个 Source:
object FlickrSource {
val apiKey = Play.current.configuration.getString("flickr.apikey")
val flickrUserId = Play.current.configuration.getString("flickr.userId")
val flickrPhotoSearchUrl = s"https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&page=%s&per_page=500"
def byDate(date: LocalDate): Source[JsValue, Unit] = {
Source(new FlickrPhotoSearchPublisher(date))
}
}
class FlickrPhotoSearchPublisher(date: LocalDate) extends Publisher[JsValue] {
override def subscribe(subscriber: Subscriber[_ >: JsValue]) {
try {
val from = new LocalDate()
val fromSeconds = from.toDateTimeAtStartOfDay.getMillis
val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis
def pageGet(page: Int): Unit = {
val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds, page)
Logger.debug("Flickr search request: " + url)
val photosFound = WS.url(url).get().map { response =>
val json = response.json
val photosThisPage = (json \ "photos" \ "photo").as[JsArray]
val numPages = (json \ "photos" \ "pages").as[JsNumber].value.toInt
Logger.debug(s"pages: $numPages")
Logger.debug(s"photos this page: ${photosThisPage.value.size}")
photosThisPage.value.foreach { photo =>
Logger.debug(s"onNext")
subscriber.onNext(photo)
}
if (numPages > page) {
Logger.debug("nextPage")
pageGet(page + 1)
} else {
Logger.debug("onComplete")
subscriber.onComplete()
}
}
}
pageGet(1)
} catch {
case ex: Exception => {
subscriber.onError(ex)
}
}
}
}
它将向 Flickr 发出搜索请求,并将结果作为JsValue
s 获取。我试图将它连接到许多不同的流和接收器,但这将是最基本的设置:
val source: Source[JsValue, Unit] = FlickrSource.byDate(date)
val sink: Sink[JsValue, Future[Unit]] = Sink.foreach(println)
val stream = source.toMat(sink)(Keep.right)
stream.run()
我看到onNext
被调用了几次,然后是onComplete
. 但是,Sink 没有收到任何东西。我错过了什么,这不是创建源的有效方法吗?