4

我很难使用 Spring Reactor Stream API(类似于 rxjava)在我的服务中构造一个响应对象,该对象包装由两个下游服务提供的响应。

以下是accept()我的 Channel Consumer 上的方法。为了保护无辜者,一些名字已被更改。

@覆盖
公共无效接受(最终 NetChannel 频道){
    log.info("消耗 FullHttpRequest 的 NetChannel");
    尝试 {
        // 我们的初始 Stream 是包含我们的 XML 文档的单个 HTTP 请求。
        流请求流 = channel.in();

        requestStream.filter((x) -> {return true;})
                .dispatchOn(调度员)
                .map(new FooRequestFunction()) // 1)
                .flatMap(new BarRequestStreamFunction()) // 2)
                .flatMap(new DownstreamRequestZipFunction()) // 3)
                .toList() // 4)
                .onComplete(new ResponsesConsumer(channel)); // 5)

    } 捕捉(异常 e){
        log.error("Channel处理过程中抛出异常", e);
    }
}

因此,aFooRequest包装了 many BarRequests,每个都有一个关联的 Classify 请求和一个关联的 Validate 请求。我们想要 1) 转换为 a FooRequest,2) 转换FooRequest为一系列BarRequests, 3) 为每个 运行两个下游请求BarRequest, 4) 将我们所有的对象聚合BarResponse成一个整体响应, 5) 将响应发送回客户端。

我遇到问题的地方是toList()方法,它似乎永远不会执行。每次我尝试一些涉及 a 的事情时,Promise它似乎总是会崩溃,这也不例外。

FooRequestFunctionBarRequestStreamFunction相当简单,似乎运行良好。他们的方法签名是:

// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);

和:

// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest dsoRequests);

DownstreamRequestZipFunction看起来像这样:

@覆盖
公共流应用(BarRequest t){
    流分类Res = 流
            .just(t)
            .flatMap(new ClassifyDownstreamRequestFunction());

    流 validateRes = 流
            .just(t)
            .flatMap(new ValidateDownstreamRequestFunction());

    return Streams.zip(classifyRes, validateRes, (tuple) -> {
        BarResponse 响应 = 新 BarResponse();
        response.setClassifyRes(tuple.getT1());
        response.setValidateRes(tuple.getT2());
        返回响应;
    });
}

只要两个下游请求函数都返回结果,这似乎工作正常。

最后,链式调用结束时的 Consumer 具有以下签名:

// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses)

它的作用是 await() 响应承诺,然后将所有这些响应聚合到单个 XML 文档中写回通道。我可以说执行永远不会达到这种方法,因为没有任何日志记录会触发。这一切似乎都停止在 .toList() 上。

有谁知道为什么这个设置似乎会执行toList()或之后的任何事情?

编辑:好的,我有更多信息。在为应用程序中的每个线程提供命名约定以使调试更容易之后,我可以看到“shared-1”,运行 accept() 方法的线程进入 WAITING 状态,然后停留在那里。这可能与底层 Dispatcher 是一个单线程的环形缓冲区调度程序这一事实有关。

我修改了代码,使方法略有不同,并使用了多线程调度程序,并避免使用 a Promise,但我仍然有一个状态,即链式调用集的尾部不会执行。见下文:

@覆盖
公共无效接受(最终 NetChannel 频道){
    log.info("消耗 FullHttpRequest 的 NetChannel");
    尝试 {
        // 我们的初始 Stream 是包含我们的 XML 文档的单个 HTTP 请求。
        流请求流 = channel.in();

        requestStream.filter((x) -> {return true;})
                .dispatchOn(调度员)
                .map(new FooRequestFunction()) // 1)
                .flatMap(new BarRequestStreamFunction()) // 2)
                .flatMap(new DownstreamRequestZipFunction()) // 3)
                .reduce(new ArrayList(), (list,resp) -> {log.info("Reducing"); list.add(resp); return list;}) // 4)
                .consumeOn((x)->{log.info("Consume");}, (x)->{log.error("error");}, (x)->{log.info("完成" );}, 调度员); // 5)

    } 捕捉(异常 e){
        log.error("Channel处理过程中抛出异常", e);
    }
}

在上面,我将 toList() 替换为对 reduce() 的调用,并将所有内容折叠成一个List<BarResponse>. 我可以看到这个执行和记录很好。但是,无论我对最后一次调用做什么,在尝试了 consume()、consumeOn() 等之后 - 它永远不会执行,也永远不会记录您在上面看到的最终调用。

在 VisualVM 中,我可以看到调度程序线程都在与阻塞队列关联的同一个对象监视器上等待 - 换句话说,它们都在等待工作到达。就像 tail consumeOn() 调用被完全忽略一样。

我在这里做错了什么?我不明白什么?

编辑 2:鉴于 Johns 在下面的回复,我怀疑问题出在服务器设置上。可能仅适用于 reactor 版本 2.0.0.M2,在主Application类中配置如下:

@豆
    公共网络服务器 httpServer(
            最终环境环境,
            最终的 MetricRegistry 指标,
            最终 ChannelConsumer 消费者)抛出 InterruptedException {

        NetServer 服务器 = 新的 TcpServerSpec(
                NettyTcpServer.class)
                .env(环境)
                。选项(
                        新的 NettyServerSocketOptions()
                            .pipelineConfigurer((ChannelPipeline 管道) -> 管道
                                .addLast(新的 HttpServerCodec())
                                .addLast(新的 HttpObjectAggregator(MAX_CONTENT_LENGTH))))
                .consume(消费者)
                。得到();

        server.start().await();

        返回服务器;
    }

没有为此配置调度程序,它似乎在后台使用 LMAX 破坏器,而不是NettyEventLoopDispatcher. 目前尚不清楚如何设置NettyEventLoopDispatcher和使用它作为替代调度程序。

4

1 回答 1

0

consumeOn()调用是多余的,因为您已经在使用它Dispatcher(除非您的“真实”代码使用与此示例不同的东西)。当您编写输出时,它会在内部切换为NettyEventLoopDispatcher无论如何。

我做了一个快速检查,以确保此流程在 a 之外工作,TcpServer并且按预期工作:

@Test
public void testStream() throws InterruptedException {
    Stream<String> s1 = Streams.just("Hello World!");

    s1.filter(s -> s.startsWith("Hello"))
      .dispatchOn(Environment.sharedDispatcher())
      .map(s -> s.toUpperCase())
      .flatMap(s -> Streams.just(s, s))
      .flatMap(s -> Streams.just(s, s))
      .reduce(new ArrayList<>(), (l, s) -> {
          l.add(s);
          return l;
      })
      .consume(l -> {
          System.out.println("thread: " + Thread.currentThread() + ", l: " + l);
      });

    Thread.sleep(500);
}

通过简单地注释掉.dispatchOn()调用来了解流是否在 Netty 线程上工作会很有趣。

于 2015-03-11T18:25:37.097 回答