0

I recently got an interest into trying out some of the streaming capabilities in the Scala world. This happened while I was reading up on the Iteratee API in Play 2.

However, people seem to think the Iteratee API is close to deprecation, and have recommended me one of the following libs:

  • scalaz-stream
  • akka-streams, or more specifically, akka-http

I didn't really feel like getting into the scalaz world, so I decided to check out akka-http.

Unfortunately, documentation seems to be very sparse on the subject of akka-http at the moment, and I'm having a lot of trouble getting everything to work.

As per usual, I chose everyone's favourite source of streaming data to play around with: Twitter.

Googling the subject mostly leads to either

  • Matthias Nehlsens's excellent work on his BirdWatch project. Unfortunately, he still uses the Iteratees.
  • People using akka-streams with a Twitter client, but I'm not very fond of using those, as I won't be really learning much from those.

Performing a basic GET request with akka-http seems to be structured somewhat like this:

object AkkaHttpExample extends App {

    implicit val system = ActorSystem("akka-http-example")
    implicit val materializer = ActorMaterializer()
    import system.dispatcher 

    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("www.stackoverflow.com", 80)

    val request: HttpRequest = HttpRequest(
        HttpMethods.GET,
        uri = "/"
    )

    val future: Future[HttpResponse] =
        Source.single(request)
            .via(connectionFlow)
            .runWith(Sink.head)

    val result: HttpResponse = Await.result(future, 5 seconds)
}

The code above works (though parsing the body of the results is pretty annoying for some reason).

When I try to do the same but pointing to the /1.1/statuses/sample.json endpoint (which should emit an example stream), my Future just sits there and times out. While this may seem logical given the streaming nature of the data, this should instead return a 404, because at this point I'm not even doing proper OAuth.

For reference, this is my code so far at this point:

object AkkaHttpExample extends App {

    implicit val system = ActorSystem("akka-http-example")
    implicit val materializer = ActorMaterializer()
    import system.dispatcher    

    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)

    val request: HttpRequest = HttpRequest(
        HttpMethods.GET,
        uri = "/1.1/statuses/sample.json"
    )

    val future: Future[HttpResponse] =
        Source.single(request)
            .via(connectionFlow)
            .runWith(Sink.head)

    val result: HttpResponse = Await.result(future, 50 seconds)
}

Like I said, I figured that maybe the streaming nature of this might be causing the problem regardless, so I tried to change my code to handle the chunks one by one based on the only examples I could find, as shown here:

object AkkaHttpExample extends App {

    implicit val system = ActorSystem("akka-http-example")
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)

    val request: HttpRequest = HttpRequest(
        HttpMethods.GET,
        uri = "/1.1/statuses/sample.json"
    )

    Source.single(request)
        .via(connectionFlow)
        .map(_.entity.dataBytes)
        .flatten(FlattenStrategy.concat)
        .map(_.decodeString("UTF-8"))
        .runForeach(println _)
        .onComplete(_ => system.terminate())
}

This causes the application to terminate immediately. Remove the .onComplete clause keeps the ActorSystem and the application running, but nothing seems to actually happen :'(

Does anyone have any experience with this? The library has been one huge headache so far. Should I go back to Play + WS + Iteratees?

4

1 回答 1

1

I think the first problem you are running into is incorrectly using http (port 80) vs https (port 443) for the protocol for your connection. I was able to get things to work when I switched to https and passed a valid oauth Authorization header. I modified your example a bit as well to match the latest http api. Here's what I got working:

implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher

val connectionFlow = Http().cachedHostConnectionPoolTls[Long]("stream.twitter.com", 443)

//Note myAuth omitted here.  Also using a RawHeader because I was
//too lazy to use the real Authorization model header
val authHeader = RawHeader("Authorization", myAuth)
Source.single((request, 1l))
    .via(connectionFlow)
    .map{
      case (util.Success(resp), id) =>
        resp.entity.dataBytes
      case other =>
        println(s"Got unexpected response: $other")            
        Source.empty
     }
    .flatten(FlattenStrategy.concat)
    .map(_.decodeString("UTF-8"))
    .runForeach(println _)
    .onComplete{tr =>
      system.shutdown
    }
于 2015-08-21T12:31:38.767 回答