我使用 akka-stream-typed 创建了一个双向 tcp 流。
void createSocket(ActorSystem<Void> actorSystem, ActorRef<SomeApi> ownerRef, String serverIp, int serverPort) {
Tcp.get(actorSystem).outgoingConnection(serverIp, serverPort)
.join(createSocketFlow(ownerRef))
.run(actorSystem);
}
Flow<ByteString, ByteString, NotUsed> createSocketFlow(ActorRef<SomeApi> socketOwner) {
return Flow.fromGraph(GraphDSL.create(builder -> {
Source<ByteString, ActorRef<ByteString>> pushSource =
ActorSource.<ByteString>actorRef(elem -> false,
m -> Optional.empty(),
40,
OverflowStrategy.dropBuffer())
.mapMaterializedValue(param -> {
socketOwner.tell(SetSocketPushRef.of(param));
return param;
});
SourceShape<ByteString> source = builder.add(pushSource);
Sink<ByteString, NotUsed> in = Flow.of(ByteString.class)
.to(ActorSink.<ByteString, SomeApi, ByteString>actorRefWithBackpressure(socketOwner,
(socketRef, bytes) -> ReceivedSocketPacket.of(bytes.toArray()),
SocketInitiated::of,
SocketCompleted.INSTANCE,
SocketError::of));
SinkShape<ByteString> input = builder.add(in);
return FlowShape.of(input.in(), source.out());
}));
}
因为我需要一个双向套接字(它发送和接收字节),所以我必须创建一个源引用。
此流程工作正常,但是当创建的套接字数超过 1000 时,一些套接字往往会意外完成,而没有来自套接字所有者的信号已发送。
关于原因的任何线索?