您可以采取的一种方法是通过 EventBus 发送请求以开始收集。回应只是一个确认。您的收集器 Verticle 会将数据发送到原始请求所述的临时地址。在循环的每次迭代中,收集器都会检查可能由其他操作设置的请求的“isCanceled”标志。奖励轨道:将其包装成一个可流动的,因此您的业务逻辑不需要知道它下面的事件总线机制。我最近描述了类似的事情。
要非常清楚。当您的代码如下所示时:
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(promise -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
promise.complete(result); },
res -> { System.out.println("The result is: " + res.result());
});
没有什么会打断您的someAPI.blockingMethod("hello");
通话。你唯一能做的就是最终违背承诺。
但是,我认为这里就是这种情况,您编写了someAPI
(在某种程度上)。那么你可以做这样的事情:
public class CollectorVerticle extends AbstractVerticle {
private final ConcurrentMap <long, AtomicBoolean> cancelTracker = new ConcurrentHashMap<>();
...
在你的执行中:
final long key = Date.getTime(); // We use a Milisecond as key
executor.executeBlocking(promise -> {
// Call some blocking API that takes a significant amount of time to return
StringBuilder result = new StringBuilder();
someAPI.fixedblockingMethod(result, cancelTracker, key, "hello");
promise.complete(result.toString());
},
res -> {
if res.succeeded() {
System.out.println("The result is: " + res.result());
} else {
// Do whatever - check for error etc
}
cancelTracker.remove(key);
});
}
然后在你的方法中沿着这些思路:
fixedblockingMethod(StringBuilder b, Map cancelTracker, long key, String whatever) {
for (int i =0; i < eternity; i++) {
if (!cancelTracker.get(key).get() {
break;
}
b.append(getNextItemFromInterface();
}
}
这应该够了吧。我仍然会通过事件总线发送单个项目,而不是在阻塞方法上收集