0

我使用 akka-stream-typed 创建了一个双向 tcp 流。

void createSocket(ActorSystem<Void> actorSystem, ActorRef<SomeApi> ownerRef, String serverIp, int serverPort) {
    Tcp.get(actorSystem).outgoingConnection(serverIp, serverPort)
            .join(createSocketFlow(ownerRef))
            .run(actorSystem);
}

Flow<ByteString, ByteString, NotUsed> createSocketFlow(ActorRef<SomeApi> socketOwner) {
    return Flow.fromGraph(GraphDSL.create(builder -> {

        Source<ByteString, ActorRef<ByteString>> pushSource =
                ActorSource.<ByteString>actorRef(elem -> false,
                                m -> Optional.empty(),
                                40,
                                OverflowStrategy.dropBuffer())
                        .mapMaterializedValue(param -> {
                            socketOwner.tell(SetSocketPushRef.of(param));
                            return param;
                        });

        SourceShape<ByteString> source = builder.add(pushSource);

        Sink<ByteString, NotUsed> in = Flow.of(ByteString.class)
                .to(ActorSink.<ByteString, SomeApi, ByteString>actorRefWithBackpressure(socketOwner,
                        (socketRef, bytes) -> ReceivedSocketPacket.of(bytes.toArray()),
                        SocketInitiated::of,
                        SocketCompleted.INSTANCE,
                        SocketError::of));

        SinkShape<ByteString> input = builder.add(in);

        return FlowShape.of(input.in(), source.out());
    }));
}

因为我需要一个双向套接字(它发送和接收字节),所以我必须创建一个源引用。

此流程工作正常,但是当创建的套接字数超过 1000 时,一些套接字往往会意外完成,而没有来自套接字所有者的信号已发送。

关于原因的任何线索?

4

0 回答 0