I recently got an interest into trying out some of the streaming capabilities in the Scala world. This happened while I was reading up on the Iteratee API in Play 2.
However, people seem to think the Iteratee API is close to deprecation, and have recommended me one of the following libs:
- scalaz-stream
- akka-streams, or more specifically, akka-http
I didn't really feel like getting into the scalaz world, so I decided to check out akka-http.
Unfortunately, documentation seems to be very sparse on the subject of akka-http at the moment, and I'm having a lot of trouble getting everything to work.
As per usual, I chose everyone's favourite source of streaming data to play around with: Twitter.
Googling the subject mostly leads to either
- Matthias Nehlsens's excellent work on his BirdWatch project. Unfortunately, he still uses the Iteratees.
- People using akka-streams with a Twitter client, but I'm not very fond of using those, as I won't be really learning much from those.
Performing a basic GET request with akka-http seems to be structured somewhat like this:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("www.stackoverflow.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/"
)
val future: Future[HttpResponse] =
Source.single(request)
.via(connectionFlow)
.runWith(Sink.head)
val result: HttpResponse = Await.result(future, 5 seconds)
}
The code above works (though parsing the body of the results is pretty annoying for some reason).
When I try to do the same but pointing to the /1.1/statuses/sample.json
endpoint (which should emit an example stream), my Future just sits there and times out. While this may seem logical given the streaming nature of the data, this should instead return a 404, because at this point I'm not even doing proper OAuth.
For reference, this is my code so far at this point:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/1.1/statuses/sample.json"
)
val future: Future[HttpResponse] =
Source.single(request)
.via(connectionFlow)
.runWith(Sink.head)
val result: HttpResponse = Await.result(future, 50 seconds)
}
Like I said, I figured that maybe the streaming nature of this might be causing the problem regardless, so I tried to change my code to handle the chunks one by one based on the only examples I could find, as shown here:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/1.1/statuses/sample.json"
)
Source.single(request)
.via(connectionFlow)
.map(_.entity.dataBytes)
.flatten(FlattenStrategy.concat)
.map(_.decodeString("UTF-8"))
.runForeach(println _)
.onComplete(_ => system.terminate())
}
This causes the application to terminate immediately. Remove the .onComplete
clause keeps the ActorSystem and the application running, but nothing seems to actually happen :'(
Does anyone have any experience with this? The library has been one huge headache so far. Should I go back to Play + WS + Iteratees?