1

我通过从文件中读取内容创建了一个 akka-stream 源。

implicit val system = ActorSystem("reactive-process")
implicit val materializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = system.dispatcher

case class RequestCaseClass(data: String)

def httpPoolFlow(host: String, port: String) = Http().cachedHostConnectionPool[RequestCaseClass](host, port)(httpMat)

def makeFileSource(filePath: String) = FileIO.fromFile(new File(filePath))
  .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = false))
  .map(_.utf8String)


def requestFramer = Flow.fromFunction((dataRow: String) =>
  (getCmsRequest(RequestCaseClass(dataRow)), RequestCaseClass(dataRow))
).filter(_._1.isSuccess).map(z => (z._1.get, z._2))

def responseHandler = Flow.fromFunction(
  (in: (Try[HttpResponse], RequestCaseClass)) => {
    in._1 match {
      case Success(r) => r.status.intValue() match {
        case 200 =>
          r.entity.dataBytes.runWith(Sink.ignore)
          logger.info(s"Success response for: ${in._2.id}")
          true
        case _ =>
          val b = Await.result(r.entity.dataBytes.runFold(ByteString.empty)(_ ++ _)(materializer)
            .map(bb => new String(bb.toArray))(materializer.executionContext), 60.seconds)
          logger.warn(s"Non-200 response for: ${in._2.id} r: $b")
          false
      }

      case Failure(t) =>
        logger.error(s"Failed completing request for: ${in._2.id} e: ${t.getMessage}")
        false
    }
  }

现在,当在可运行图形中使用它时,我希望整个流在到达使用makeFileSource(fp).

makeFileSource(fp)
  .via(requestFramer)
  .via(httpPoolFlow)
  .via(responseHandler)
  .runWith(Sink.ignore)

流当前不会在到达文件末尾时停止。

4

0 回答 0