我正在试验 Akka 流和 twitter4j-stream。拥有一系列推文它可以正常工作,直到它失败的那一刻NPE
,这对我来说没有意义。
最初我正在创建一个client
:
val client: TwitterStream = {
using(TwitterClient.clientFactory.getInstance()) { inst =>
inst.setOAuthConsumer(auth.consumerKey, auth.consumerSecret)
inst.setOAuthAccessToken(new AccessToken(auth.accessTokenKey, auth.accessTokenSecret))
}
}
最后,我想得到具有以下签名的东西:val stream: Source[TwitterEvent, ShutdownHandler]
所以我正在创建一个 ReactiveStreams Publisher
,它将Source
使用来自客户端的使用:
val pub = new Publisher[TwitterEvent] {
override def subscribe(s: Subscriber[_ >: TwitterEvent]): Unit = {
client.addListener(new TwitterListener(s))
client.filter(filter)
}
}
val stream: Source[TwitterEvent, ShutdownHandler] = {
Source.fromPublisher(pub).mapMaterializedValue { nu =>
new TwitterClient.ShutdownHandler(() => client.shutdown())
}
}
所以.filter()
引导这个东西,我收到了一些推文,然后它失败了:
[ERROR] [04/08/2016 15:18:19.301] [System-akka.actor.default-dispatcher-4] [akka://System/user/StreamSupervisor-0/flow-0-0-foreachSink-foreachSink-map] It is illegal to throw exceptions from request(), rule 3.16
akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16
at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:111)
...
Caused by: java.lang.NullPointerException
at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:110)
如果这很重要,我的TwitterListener
:
final class TwitterListener(s: Subscriber[_ >: TwitterEvent]) extends StatusAdapter {
override def onStatus(status: Status): Unit = {
if (status != null) {
val message = new TwitterEvent(status)
s.onNext(message)
}
}
override def onException(ex: Exception): Unit = {
s.onError(ex)
}
}