0

我正在尝试通过这样的 Vertx 事件总线(与 Hazelcast 集群)发送大量消息而不会阻塞:

EventBus eb = vertx.eventBus();

for (int i = 0; i < 100; i++) {
  vertx.setPeriodic(1, num -> {
    eb.send("clusteredEndpoint", "ping");
  });
}

当计时器数量较少时,它可以正常工作,但在大约 100 个计时器时,我会收到此错误。

我想知道如何在不阻塞的情况下扩展到 100K 事件/秒(作为参考,我编写了一个可能超过这个数字的 Vertx WebSocket 测试)。

如果不可能,我想了解什么是阻塞 - 看起来它是这个类中的东西:https ://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx /core/eventbus/impl/clustered/Serializer.java

供参考 - 此代码不会阻塞 - 即使有 1000 个计时器:

HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
  for (int i = 0; i < 1000; i++) {
    vertx.setPeriodic(1, num -> {
      res.result().writeTextMessage("ping");
    });
  }
});
});

2020 年 12 月 15 日上午 10:54:38 io.vertx.core.impl.BlockedThreadChecker 警告:线程 Thread[vert.x-eventloop-thread-1,5,main] 已被阻止 36794 毫秒,时间限制为 2000 毫秒io.vertx.core.VertxException:线程在 io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java: 23) 在 io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) 在 io.vertx.core 的 io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133) .spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) 在 io.vertx.core.spi.cluster.impl.DefaultNodeSelector.selectForSend(DefaultNodeSelector.java:42) 在 io.vertx.core.eventbus .impl.clustered.ClusteredEventBus$$Lambda$1065/195695453.accept(未知来源)在 io.vertx.core.eventbus.impl.clustered。Serializer$SerializerQueue$SerializedTask.process(Serializer.java:147) 在 io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.checkPending(Serializer.java:94) 在 io.vertx.core.eventbus.impl。 clustered.Serializer$SerializerQueue.add(Serializer.java:114) 在 io.vertx.core.eventbus.impl.clustered.Serializer.queue(Serializer.java:65) 在 io.vertx.core.eventbus.impl.clustered。 ClusteredEventBus.sendOrPub(ClusteredEventBus.java:172) 在 io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:127) 在 io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java: 394) 在 io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:103) 在 io.vertx.core .eventbus.impl.EventBusImpl.send(EventBusImpl.java:97) 在 io.vertx。example.EBtestClient.lambda$start$0(EBtestClient.java:22) at io.vertx.example.EBtestClient$$Lambda$1056/1487417027.handle(Unknown Source) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle( VertxImpl.java:939) 在 io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:910) 在 io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:52) 在 io.vertx .core.impl.ContextImpl.emit(ContextImpl.java:294) 在 io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) 在 io.vertx.core.impl.AbstractContext.emit(AbstractContext.java :49) 在 io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:933) 在 io.netty.util 的 io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24)。 concurrent.PromiseTask.runTask(PromiseTask.java:98) 在 io.netty.util.concurrent.ScheduledFutureTask。在 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTask​​s(SingleThreadEventExecutor.java:472) 在 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTask​​s(SingleThreadEventExecutor.java:472) 在 io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) 运行(ScheduledFutureTask.java:176) .channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap .java:74) 在 io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:748)在io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 在io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在io 运行(NioEventLoop.java:500) .netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:748)在io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 在io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在io 运行(NioEventLoop.java:500) .netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:748)

4

2 回答 2

1

首先,您将在同一个线程上运行 100 个任务,因为 Vert.x 具有线程关联性。如果您想避免这种情况,请在单独的 Verticle 上运行它们。但是,我不认为你有 100 个 CPU,所以会有很多争用。

并且将它们全部设置为每 1 毫秒执行一次意味着它们需要在 10 微秒内完成,其中包括网络代码,因为您使用的是集群 EventBus。

所以,这是测试的编写方式,而不是 Vert.x 在做什么。

如果您真的想测试这种负载(我们在这里谈论 100K rps),请将您的请求分散到多台机器上。

但是事件,我不确定 Hazelcast 是否可以处理这种负载。

如果您想知道真正阻塞的内容,我的猜测是这部分代码:

https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java#L43

由于我没有容易设置的集群 Vert.x,但我无法确认我的假设是否正确。

于 2020-12-16T13:22:26.293 回答
0

这是我进一步调查后的分析:

当使用 Vertx 事件总线进行远程通信时,一旦消费者不堪重负,它就会停止响应。这会导致生产者阻塞,我捕获了 3 条不同的阻塞消息(见下文)。在阻塞警告之后有这个警告:

警告:服务器 2d1fb2ce-940f-4b60-bf60-39847f31bcaf 没有乒乓球 - 会认为它死了

我的问题的答案是“为什么”它阻塞并不重要,因为它已经死了(因为它达到了一定的限制)。

我很惊讶 Vert.x 没有更优雅地处理这个问题——比如可能抛出异常。

阻塞错误 #1

线程在 io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) 在 io.vertx.core 的 io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) 阻塞.impl.future.FutureImpl.onComplete(FutureImpl.java:133) 在 io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) 在 io.vertx.core.spi.cluster.impl.selector .Selectors.withSelector(Selectors.java:48) 在

阻塞错误 #2

io.vertx.core.VertxException:线程在 sun.nio.cs.UTF_8 的 java.nio.charset.CharsetEncoder.(CharsetEncoder.java:233) 处的 java.nio.charset.CharsetEncoder.(CharsetEncoder.java:198) 处阻塞$Encoder.(UTF_8.java:558) 在 sun.nio.cs.UTF_8$Encoder.(UTF_8.java:554) 在 sun.nio.cs.UTF_8.newEncoder(UTF_8.java:72)

阻塞错误 #3

io.vertx.core.VertxException:线程在 io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendRemote( ClusteredEventBus.java:332) 在 io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendToNode(ClusteredEventBus.java:283)

于 2021-01-04T16:47:36.983 回答