2

We are on: akka-stream-experimental_2.11 1.0.

Inspired by the example

We wrote a TCP receiver as follows:

def bind(address: String, port: Int, target: ActorRef)
          (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
      val serverFlow = Flow[ByteString]
        .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
        .map(message => {
        target ? new Message(message); ByteString.empty
      })
      conn handleWith serverFlow
    }

    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }

However, our intention was to have the receiver not respond at all and only sink the message. (The TCP message publisher does not care about response ).

Is it even possible? to not respond at all since akka.stream.scaladsl.Tcp.IncomingConnection takes a flow of type: Flow[ByteString, ByteString, Unit]

If yes, some guidance will be much appreciated. Thanks in advance.

One attempt as follows passes my unit tests but not sure if its the best idea:

def bind(address: String, port: Int, target: ActorRef)
          (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>

      val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target))))

      val targetSink = Flow[ByteString]
        .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
        .map(Message(_))
        .to(Sink(targetSubscriber))

      conn.flow.to(targetSink).runWith(Source(Promise().future))
    }

    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }
4

1 回答 1

1

You are on the right track. To keep the possibility to close the connection at some point you may want to keep the promise and complete it later on. Once completed with an element this element published by the source. However, as you don't want any element to be published on the connection, you can use drop(1) to make sure the source will never emit any element.

Here's an updated version of your example (untested):

val promise = Promise[ByteString]()
// this source will complete when the promise is fulfilled
// or it will complete with an error if the promise is completed with an error
val completionSource = Source(promise.future).drop(1)

completionSource  // only used to complete later
  .via(conn.flow) // I reordered the flow for better readability (arguably)
  .runWith(targetSink)

// to close the connection later complete the promise:
def closeConnection() = promise.success(ByteString.empty) // dummy element, will be dropped

// alternatively to fail the connection later, complete with an error
def failConnection() = promise.failure(new RuntimeException)
于 2015-07-27T14:40:56.990 回答