1

我在使用 scalatest 测试我的 akka 流应用程序时收到 NullPointerException 并且不明白为什么......我可能错过了 Akka Streams 中的某些内容,我只是在深入研究它。

我使用scala 2.12.4和sbt 1.0.3的scalatest代码的通用结构这是我的应用程序

object CdrToMongoReactiveStream extends App {

  implicit val system = ActorSystem("cdr-data-generator")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext=materializer.executionContext
  import RandomCdrJsonProtocol._

  val randomCdrThrottledSource : Source[RandomCdr,NotUsed]= Source
    .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange)))
    .throttle(throughput,1.second,1,ThrottleMode.shaping)
    .named("randomCdrThrottledSource")

  val cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= Flow[RandomCdr]
    .map((cdr: RandomCdr) => cdr.toJson.toString())
    .named("cdrJsonParseFlow")

  val mongodbBulkSink : Sink[String,NotUsed] = Flow[String]
    .map((json: String) => Document.parse(json))
    .map((doc: Document) => new InsertOneModel[Document](doc))
    .grouped(bulkSize)
    .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒
      Source.fromPublisher(collection.bulkWrite(docs.toList.asJava))
    }
    .to(Sink.ignore)

  val f = randomCdrThrottledSource.via(cdrJsonParseFlow).runWith(mongodbBulkSink)
}

还有我的测试文件

class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers {

  import RandomCdrJsonProtocol._

  "randomCdrThrottledSource" should {
    "generate RandomCdr elements only" in {
      val future = CdrToMongoReactiveStream.randomCdrThrottledSource
        // line 30 in the error
        .runWith(Sink.head)(CdrToMongoReactiveStream.materializer)

      val cdr = Await.result(future,10.second)
      cdr shouldBe a [RandomCdr]
    }
  }
  "cdrJsonParseFlow" should {
    "parse RandomCdr to correct json format" in {
      val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0)
      val (pub,sub) = TestSource.probe[RandomCdr]
        .via(CdrToMongoReactiveStream.cdrJsonParseFlow)
        .toMat(TestSink.probe[String])(Keep.both)
        .run()

      sub.request(1)
      pub.sendNext(randomCdr)
      sub.expectNext() shouldBe equal(randomCdr.toJson.toString())
    }
  }
}

和错误信息

java.lang.NullPointerException was thrown.
java.lang.NullPointerException
    at CdrToMongoReactiveStreamSpec.$anonfun$new$2(CdrToMongoReactiveStreamSpec.scala:30)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076)
    at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088)
    at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070)
    at org.scalatest.WordSpec.runTest(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147)
    at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146)
    at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)
    at org.scalatest.Suite.run(Suite.scala:1147)
    at org.scalatest.Suite.run$(Suite.scala:1129)
    at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192)
    at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190)
    at org.scalatest.WordSpec.run(WordSpec.scala:1881)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
    at org.scalatest.tools.Runner$.run(Runner.scala:850)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
4

1 回答 1

2

我解决了在主程序之外声明 Source、Flow 和 Sink 的问题

object CdrToMongoReactiveStream {

  def randomCdrThrottledSource(msisdnLength : Int,timeRange : Int, throughput : Int): Source[RandomCdr,NotUsed]= {
    Source
      .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange)))
      .throttle(throughput,1.second,1,ThrottleMode.shaping)
      .named("randomCdrThrottledSource")
  }

  def cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= {
    import RandomCdrJsonProtocol._

    Flow[RandomCdr]
      .map((cdr: RandomCdr) => cdr.toJson.toString())
      .named("cdrJsonParseFlow")
  }

  def mongodbBulkSink(collection : MongoCollection[Document], bulkSize : Int) : Sink[String,NotUsed] = {

    Flow[String]
      .map((json: String) => Document.parse(json))
      .map((doc: Document) => new InsertOneModel[Document](doc))
      .grouped(bulkSize)
      .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒
        Source.fromPublisher(collection.bulkWrite(docs.toList.asJava))
      }
      .to(Sink.ignore)
  }

  def main(args: Array[String]): Unit = {
    val f = randomCdrThrottledSource(msisdnLength,timeRange,throughput)
      .via(cdrJsonParseFlow).runWith(mongodbBulkSink(collection,bulkSize))

    logger.info("Generated random data")
  }
}

和测试文件

class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers {

  import CdrToMongoReactiveStream._
  import RandomCdrJsonProtocol._

  implicit val system = ActorSystem("cdr-data-generator")
  implicit val materializer = ActorMaterializer()

  val collection = new Fongo("mongo test server").getDB("cdrDB").getCollection("cdr")
  val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0)

  "randomCdrThrottledSource" should {
    "generate RandomCdr elements only" in {
      val future = CdrToMongoReactiveStream.randomCdrThrottledSource(8,86400000,1)
        .runWith(Sink.head)

      val cdr = Await.result(future,5.second)
      cdr shouldBe a [RandomCdr]
    }
  }
}
于 2017-11-27T17:35:47.840 回答