1

我正在尝试从 Micronaut kafka 实现中获取非阻塞响应,但是返回值不起作用。

public class ProductManager implements IProductManager{
    private final ApplicationContext applicationContext;
    
    public ProductManager(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    public ProductViewModel findFreeText(String text) {
       
        final ProductViewModel model = new ProductViewModel();
        IProductProducer client = applicationContext.getBean(IProductProducer.class);
         client.findFreeText(text).subscribe(item -> {
          System.out.println(item);
        });
         return model;
    }
}

subscribe 方法不起作用,调试器永远不会到达这一点。我想从kafka监听器那里取回价值

卡夫卡制作人

@KafkaClient
public interface IProductProducer {
    
    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
    Flowable<ProductViewModel> findFreeText(String text);
}

卡夫卡监听器

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class ProductListener {
    private static final Logger LOG = LoggerFactory.getLogger(ProductListener.class);

    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
    public Flowable<Product>> findByFreeText(String text) {
        LOG.info("Listening value = ", text);
        
        return Flowable.just(new Product("This is the test", 0,"This is test description"));
    }
}

非阻塞方法的 Micronaut 文档

https://docs.micronaut.io/1.0.0.M3/guide/index.html#_reactive_and_non_blocking_method_definitions

4

0 回答 0