1

我正在尝试创建一个 Java RSocket 通道。这是我的 SpringBoot 服务器端:

@MessageMapping("channel")
    Flux<Payload> channel(Publisher<Payload> payloads) {

        System.out.println("Received Request Channel.");

        return Flux
                .from(payloads)
                .map(incomingPayload ->
                        DefaultPayload
                                .create("Channel Response: " + incomingPayload.getDataUtf8()));
    }
}

这是我使用原始 RSocket 的客户端:

public class Client {

    private final RSocket socket;
    private final CompositeByteBuf metadata;

    public static void main(String[] args) throws InterruptedException {

        final String route = "channel";


        Client client = new Client(route);
        client.sendDataViaChannel();        
    }

    public Client(final String route) {

        this.socket = RSocketConnector.create()
                .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
                .connect(TcpClientTransport.create("localhost", 8888))
                .block();

        // metadata for routing
        this.metadata = ByteBufAllocator.DEFAULT.compositeBuffer();
        setupRoute(route);

        System.out.println("Socket created!");
    }

    private void setupRoute(final String route) {
        RoutingMetadata routingMetadata = TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT, List.of(route));
        CompositeMetadataCodec.encodeAndAddMetadata(metadata,
                ByteBufAllocator.DEFAULT,
                WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
                routingMetadata.getContent());

    }

 private void sendDataViaChannel() throws InterruptedException {

        Flux<Payload> payloads = Flux.range(1, 512).map(i -> DefaultPayload.create(
                ByteBufAllocator.DEFAULT.buffer().writeBytes(("hello " + i).getBytes()), metadata));

        socket.requestChannel(payloads)
                .doOnNext(p -> System.out.println(
                        "Received Back: " + p.getDataUtf8()
                ))
                .blockLast();
}

但是在启动客户端时,我收到此错误:

Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
    at io.rsocket.util.DefaultPayload.create(DefaultPayload.java:107)
    at com.davide.client.Client.lambda$sendDataViaChannel$1(Client.java:102)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:155)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:110)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
    at io.rsocket.core.RequestChannelRequesterFlux.handleRequestN(RequestChannelRequesterFlux.java:714)
    at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:268)
    at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:211)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129)
    at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:365)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Flux.blockLast(Flux.java:2519)
        at com.davide.client.Client.sendDataViaChannel(Client.java:109)
        at com.davide.client.Client.main(Client.java:37)

有人可以帮我找出问题所在吗?

4

0 回答 0