0

我基于这样的 ReactiveStreams Publisher 为 Akka Stream 创建了一个 Source:

object FlickrSource {

  val apiKey = Play.current.configuration.getString("flickr.apikey")
  val flickrUserId = Play.current.configuration.getString("flickr.userId")
  val flickrPhotoSearchUrl = s"https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&page=%s&per_page=500"

  def byDate(date: LocalDate): Source[JsValue, Unit] = {
    Source(new FlickrPhotoSearchPublisher(date))
  }
}

class FlickrPhotoSearchPublisher(date: LocalDate) extends Publisher[JsValue] {

  override def subscribe(subscriber: Subscriber[_ >: JsValue]) {
    try {
      val from = new LocalDate()
      val fromSeconds = from.toDateTimeAtStartOfDay.getMillis
      val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis

      def pageGet(page: Int): Unit = {
        val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds, page)
        Logger.debug("Flickr search request: " + url)
        val photosFound = WS.url(url).get().map { response =>
          val json = response.json
          val photosThisPage = (json \ "photos" \ "photo").as[JsArray]
          val numPages = (json \ "photos" \ "pages").as[JsNumber].value.toInt
          Logger.debug(s"pages: $numPages")
          Logger.debug(s"photos this page: ${photosThisPage.value.size}")
          photosThisPage.value.foreach { photo =>
            Logger.debug(s"onNext")
            subscriber.onNext(photo)
          }

          if (numPages > page) {
            Logger.debug("nextPage")
            pageGet(page + 1)
          } else {
            Logger.debug("onComplete")
            subscriber.onComplete()
          }
        }
      }
      pageGet(1)
    } catch {
      case ex: Exception => {
        subscriber.onError(ex)
      }
    }
  }
}

它将向 Flickr 发出搜索请求,并将结果作为JsValues 获取。我试图将它连接到许多不同的流和接收器,但这将是最基本的设置:

val source: Source[JsValue, Unit] = FlickrSource.byDate(date)
val sink: Sink[JsValue, Future[Unit]] = Sink.foreach(println)
val stream = source.toMat(sink)(Keep.right)
stream.run()

我看到onNext被调用了几次,然后是onComplete. 但是,Sink 没有收到任何东西。我错过了什么,这不是创建源的有效方法吗?

4

1 回答 1

0

我错误地理解这Publisher是一个简单的接口Observable,你可以自己实现。Akka 团队指出,这不是实现 Publisher 的正确方法。事实上 Publisher 是一个复杂的类,应该由库而不是最终用户来实现。问题中使用的这种Source.apply(Publisher)方法用于与其他 Reactive Streams 实现的互操作性。

想要实现 Source 的目的是我想要一个背压源从 Flickr 获取搜索结果(每个请求最多 500 个),并且我不想发出比下游所需更多(或更快)的请求。这可以通过实现ActorPublisher来实现。

更新

这是执行我想要的操作的 ActorPublisher:创建一个产生搜索结果的 Source,但只进行下游所需的尽可能多的 REST 调用。我认为还有改进的余地,所以请随时编辑。

import akka.actor.Props
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import org.joda.time.LocalDate
import play.api.Play.current
import play.api.libs.json.{JsArray, JsNumber, JsValue}
import play.api.libs.ws.WS
import play.api.{Logger, Play}

import scala.concurrent.ExecutionContext.Implicits.global

object FlickrSearchActorPublisher {
  val apiKey = Play.current.configuration.getString("flickr.apikey")
  val flickrUserId = Play.current.configuration.getString("flickr.userId")
  val flickrPhotoSearchUrl = s"https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&per_page=500&page="

  def byDate(from: LocalDate): Props = {
    val fromSeconds = from.toDateTimeAtStartOfDay.getMillis / 1000
    val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis / 1000
    val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds)

    Props(new FlickrSearchActorPublisher(url))
  }
}

class FlickrSearchActorPublisher(url: String) extends ActorPublisher[JsValue] {

  var currentPage = 1
  var numPages = 1
  var photos = Seq[JsValue]()

  def searching: Receive = {
    case Request(count) =>
      Logger.debug(s"Received Request for $count results from Subscriber, ignoring as we are still searching")
    case Cancel =>
      Logger.info("Cancel Message Received, stopping")
      context.stop(self)
    case _ =>
  }

  def accepting: Receive = {
    case Request(count) =>
      Logger.debug(s"Received Request for $count results from Subscriber")
      sendSearchResults()
    case Cancel =>
      Logger.info("Cancel Message Received, stopping")
      context.stop(self)
    case _ =>
  }

  def getNextPageOrStop() {
    if (currentPage > numPages) {
      Logger.debug("No more pages, stopping")
      onCompleteThenStop()
    } else {
      val pageUrl = url + currentPage
      Logger.debug("Flickr search request: " + pageUrl)
      context.become(searching)
      WS.url(pageUrl).get().map { response =>
        val json = response.json
        val photosThisPage = (json \ "photos" \ "photo").as[JsArray]
        numPages = (json \ "photos" \ "pages").as[JsNumber].value.toInt
        Logger.debug(s"page $currentPage of $numPages")
        Logger.debug(s"photos this page: ${photosThisPage.value.size}")
        photos = photosThisPage.value.seq
        if (photos.isEmpty) {
          Logger.debug("No photos found, stopping")
          onCompleteThenStop()
        } else {
          currentPage = currentPage + 1
          sendSearchResults()
          context.become(accepting)
        }
      }
    }
  }

  def sendSearchResults() {
    if (photos.isEmpty) {
      getNextPageOrStop()
    } else {
      while(isActive && totalDemand > 0) {
        onNext(photos.head)
        photos = photos.tail
        if (photos.isEmpty) {
          getNextPageOrStop()
        }
      }
    }
  }

  getNextPageOrStop()
  val receive = searching
}
于 2015-09-01T07:54:29.450 回答