0

我已经通过事件总线实现了服务代理类,它有 2 个方法(收集和取消收集方法需要 4 到 5 分钟才能完成,因为它通过网络并从设备收集数据。

但是在收集数据之间,如果我在服务代理上调用取消,这通常会停止从设备收集数据而不是调用

cancel 方法正在等待完成collect 方法,然后request 进入cancel

我有2 个 verticles一个用于处理HTTP 请求,第二个用于收集数据

CollcetionServiceProxy 类通过 CollectionVerticle 暴露在事件总线上,我将其部署为工作人员 verticle

在部署时,如果我部署2 个 CollectionVerticle 实例, 它工作正常但我不想那样做。

如何使用单个 Verticle 集合实例解决此问题。

4

2 回答 2

0

您可以采取的一种方法是通过 EventBus 发送请求以开始收集。回应只是一个确认。您的收集器 Verticle 会将数据发送到原始请求所述的临时地址。在循环的每次迭代中,收集器都会检查可能由其他操作设置的请求的“isCanceled”标志。奖励轨道:将其包装成一个可流动的,因此您的业务逻辑不需要知道它下面的事件总线机制。我最近描述了类似的事情。

要非常清楚。当您的代码如下所示时:

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(promise -> { 
// Call some blocking API that takes a significant amount of time to return
 String result = someAPI.blockingMethod("hello");
 promise.complete(result); },
 res -> { System.out.println("The result is: " + res.result());
});

没有什么会打断您的someAPI.blockingMethod("hello");通话。你唯一能做的就是最终违背承诺。

但是,我认为这里就是这种情况,您编写了someAPI(在某种程度上)。那么你可以做这样的事情:

public class CollectorVerticle extends AbstractVerticle {
    private final ConcurrentMap <long, AtomicBoolean> cancelTracker = new ConcurrentHashMap<>();
...

在你的执行中:

final long key = Date.getTime(); // We use a Milisecond as key
executor.executeBlocking(promise -> { 
// Call some blocking API that takes a significant amount of time to return
    StringBuilder result = new StringBuilder();
someAPI.fixedblockingMethod(result, cancelTracker, key, "hello");
 promise.complete(result.toString());
 },
 res -> {
    if res.succeeded() {
       System.out.println("The result is: " + res.result());
    } else {
      // Do whatever - check for error etc
    }
    cancelTracker.remove(key);
});

} 

然后在你的方法中沿着这些思路:

fixedblockingMethod(StringBuilder b, Map cancelTracker, long key, String whatever) {

  for (int i =0; i < eternity; i++) {
      if (!cancelTracker.get(key).get() {
        break;
      }
      b.append(getNextItemFromInterface();
  }
}

这应该够了吧。我仍然会通过事件总线发送单个项目,而不是在阻塞方法上收集

于 2020-01-05T14:25:00.720 回答
0

基于您的代码示例

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool"); 
  executor.executeBlocking(promise -> { // Call some blocking API that takes significant amount of time to return String result = 
  someAPI.blockingMethod("hello"); 
  promise.complete(result); 
}, res -> { 
  System.out.println("The result is: " + res.result()); 
});

您正在寻找停止阻塞代码,按其名称,阻塞代码在它结束之前无法停止

所以一种方法是定期检查这个阻塞代码是否应该被取消

一种实现是让你的阻塞代码返回一个可完成的未来,并在事件中取消它

于 2020-01-05T15:06:41.203 回答