下面的课程是我的工人 Verticle,我想在名为 events-config 的通道上从事件总线接收消息时执行阻塞代码。
目标是无限期地生成和发布 json 消息,直到我在 events-config 通道上收到停止操作消息。
我正在使用 executeBlocking 来实现所需的功能。然而,由于我无限期地运行阻塞操作,vertx 阻塞了线程检查器转储警告。
问题:
- 有没有办法只为特定的verticle禁用blockedthreadchecker??
- 下面的代码是否遵循在 vertx 中根据需要执行无限循环的最佳实践?如果不能,你能建议最好的方法吗?
public class WorkerVerticle extends AbstractVerticle {
Logger logger = LoggerFactory.getLogger(WorkerVerticle.class);
private MessageConsumer<Object> mConfigConsumer;
AtomicBoolean shouldPublish = new AtomicBoolean(true);
private JsonGenerator json = new JsonGenerator();
@Override
public void start() {
mConfigConsumer = vertx.eventBus().consumer("events-config", message -> {
String msgBody = (String) message.body();
if (msgBody.contains(PublishOperation.START_PUBLISH.getName()) && !mJsonGenerator.isPublishOnGoing()) {
logger.info("Message received to start producing data onto kafka " + msgBody);
vertx.<Void>executeBlocking(voidFutureHandler -> {
Integer numberOfMessagesToBePublished = 100000;
if (numberOfMessagesToBePublished <= 0) {
logger.info("Skipping message publish :"+numberOfMessagesToBePublished);
return; // is it best way to do it ??
}
publishData(numberOfMessagesToBePublished);
},false, voidAsyncResult -> logger.info("Blocking publish operation is terminated"));
} else if (msgBody.contains(PublishOperation.STOP_PUBLISH.getName()) && mJsonGenerator.isPublishOnGoing()) {
logger.info("Message received to terminate " + msgBody);
mJsonGenerator.terminatePublish();
}
});
}
private void publishData(){
while(shouldPublish.get()){
//code to generate json indefinitely until some one reset shouldPublish variable
}
}
}