1

我正在尝试找到一种创建热流的方法,我可以在其中以一种方法插入数据,而订阅者可以通过另一种方法获取数据。我已成功使用 a WorkQueueProcessor,但我不确定这是否是正确的做法。是否可以使用 Flux.create 做同样的事情?这是我的工作片段:

  1. 称呼connect();
  2. 向服务器发送字节数据,客户端将收到tcp服务器的响应并workQueueProcessor发出数据。

    @Component
    @RequiredArgsConstructor
    public class TcpCli {
    
        @Setter
        private Connection connection;
        private NettyOutbound out;
    
        //Creation of Work Queue Processor, can a Flux.create here can do the same job ?
        private WorkQueueProcessor<String> workQueueProcessor = WorkQueueProcessor.<String>builder().build();
        public Mono<? extends Connection> connect() {
            return TcpClient.create()
                .host(tcpConfig.getHost())
                .port(tcpConfig.getPort())
                .handle(this::handleConnection)
                .connect();
        }
        public Mono<String> sendData(ByteArray data) {
            out.sendByteArray(Mono.just(data)).then().subscribe();
    
            //Get emitted data from workQueueProcessor
            return workQueueProcessor.next();
        }
        private Publisher<Void> handleConnection(NettyInbound in, NettyOutbound out) {
            this.out = out;
            in.receive().asString()
                .log("In received")
                .subscribe(str -> {
                    LOGGER.info(String.format("Inbound: %s", str));
    
                    //Emit data to workQueueProcessor
                    workQueueProcessor.onNext(str);
                });
            return out
                .neverComplete()    //keep connection alive
                .log("Never close");
        }
    }
    
4

2 回答 2

0

你能解释一下@Setter 在这里做了什么吗?我尝试相同的代码,它总是给出

@Setter
private Connection connection;
private NettyOutbound out;
于 2019-03-15T01:29:07.953 回答
0

如果您有多个输入源和多个订阅者,我认为您的方法是正确的。

如果 NettyInbound 是您的 Steam 的唯一输入源,那么您不需要使用任何处理器。只需订阅它。

如果您的流有多个输入源,并且只有一个订阅者,在您的情况下为 NettyOutbound,您可以尝试轻量级的“UnicastProcessor”。

于 2018-12-26T05:03:16.110 回答