3

有一个广播器,它接受字符串并将它们附加到 StringBuilder。

我想测试一下。

我不得不Thread#sleep等待,而广播员完成字符串的处理。我想删除sleep.

我尝试使用Control#debug()不成功。

public class BroadcasterUnitTest {

@Test
public void test() {
    //prepare
    Environment.initialize();
    Broadcaster<String> sink = Broadcaster.create(Environment.newDispatcher()); //run broadcaster in separate thread (dispatcher)
    StringBuilder sb = new StringBuilder();
    sink
            .observe(s -> sleep(100)) //long-time operation
            .consume(sb::append);

    //do
    sink.onNext("a");
    sink.onNext("b");

    //assert
    sleep(500);//wait while broadcaster finished (if comment this line then the test will fail)
    assertEquals("ab", sb.toString());
}

private void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
}
4

2 回答 2

2

我不熟悉Broadcaster(并且由于问题很旧,它可能已被弃用),但是这三种方法通常会有所帮助:

  1. 在测试Project-ReactoresFlux和其他东西时,您可能最好使用专门为此制作的测试库。他们的参考资料和那部分的Javadoc非常好,我将在这里复制一个不言自明的示例:

    @Test
    public void testAppendBoomError() {
      Flux<String> source = Flux.just("foo", "bar"); 
      StepVerifier.create( 
        appendBoomError(source)) 
        .expectNext("foo") 
        .expectNext("bar")
        .expectErrorMessage("boom") 
        .verify(); 
    }
    
  2. 你可以block()自己在Fluxes 和Monos 上然后运行检查。请注意,如果发出错误,这将导致异常。但是有一种感觉,您会发现自己需要为某些情况编写更多代码(例如,检查Flux已发出 2 个项目然后因错误而终止) XY然后您将重新实现StepVerifier.

    @Test
    public void testFluxOrMono() {
      Flux<String> source = Flux.just(2, 3);
      List<Integer> result = source
            .flatMap(i -> multiplyBy2Async(i))
            .collectList()
            .block();
      // run your asserts on the list. Reminder: the order may not be what you expect because of the `flatMap`
      // Or with a Mono:
      Integer resultOfMono = Mono.just(5)
            .flatMap(i -> multiplyBy2Async(i))
            .map(i -> i * 4)
            .block();
      // run your asserts on the integer
    }
    
  3. 您可以使用通用解决方案进行异步测试,例如CountDownLatch,但同样不建议这样做,并且在某些情况下会给您带来麻烦。例如,如果您事先不知道接收器的数量,则需要使用其他东西。

于 2017-04-17T07:09:57.143 回答
0

根据上面的答案,我发现有blockLast()帮助。

@Test
public void MyTest()
{
    Logs.Info("Start test");    
 
    /* 1 */
    // Make a request
    WebRequest wr1 = new WebRequest("1", "2", "3", "4");
    String json1 = wr1.toJson(wr1);
    
    Logs.Info("Flux");
    Flux<String> responses = controller.getResponses(json1);

    /* 2 */
    Logs.Info("Responses in");
    responses.subscribe(s -> mySub.myMethod(s)); // Test for strings is in myMethod
    
    Logs.Info("Test thread sleeping");
    Thread.sleep(2000);
    
    /* 3 */
    Logs.Info("Test thread blocking");
    responses.blockLast();
    
    Logs.Info("Finish test");
}
于 2022-01-28T12:08:15.460 回答