我正在尝试创建一个 Java RSocket 通道。这是我的 SpringBoot 服务器端:
@MessageMapping("channel")
Flux<Payload> channel(Publisher<Payload> payloads) {
System.out.println("Received Request Channel.");
return Flux
.from(payloads)
.map(incomingPayload ->
DefaultPayload
.create("Channel Response: " + incomingPayload.getDataUtf8()));
}
}
这是我使用原始 RSocket 的客户端:
public class Client {
private final RSocket socket;
private final CompositeByteBuf metadata;
public static void main(String[] args) throws InterruptedException {
final String route = "channel";
Client client = new Client(route);
client.sendDataViaChannel();
}
public Client(final String route) {
this.socket = RSocketConnector.create()
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
.connect(TcpClientTransport.create("localhost", 8888))
.block();
// metadata for routing
this.metadata = ByteBufAllocator.DEFAULT.compositeBuffer();
setupRoute(route);
System.out.println("Socket created!");
}
private void setupRoute(final String route) {
RoutingMetadata routingMetadata = TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT, List.of(route));
CompositeMetadataCodec.encodeAndAddMetadata(metadata,
ByteBufAllocator.DEFAULT,
WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
routingMetadata.getContent());
}
private void sendDataViaChannel() throws InterruptedException {
Flux<Payload> payloads = Flux.range(1, 512).map(i -> DefaultPayload.create(
ByteBufAllocator.DEFAULT.buffer().writeBytes(("hello " + i).getBytes()), metadata));
socket.requestChannel(payloads)
.doOnNext(p -> System.out.println(
"Received Back: " + p.getDataUtf8()
))
.blockLast();
}
但是在启动客户端时,我收到此错误:
Exception in thread "main" io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.rsocket.util.DefaultPayload.create(DefaultPayload.java:107)
at com.davide.client.Client.lambda$sendDataViaChannel$1(Client.java:102)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:155)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:110)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
at io.rsocket.core.RequestChannelRequesterFlux.handleRequestN(RequestChannelRequesterFlux.java:714)
at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:268)
at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:211)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129)
at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:365)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2519)
at com.davide.client.Client.sendDataViaChannel(Client.java:109)
at com.davide.client.Client.main(Client.java:37)
有人可以帮我找出问题所在吗?