我正在尝试通过这样的 Vertx 事件总线(与 Hazelcast 集群)发送大量消息而不会阻塞:
EventBus eb = vertx.eventBus();
for (int i = 0; i < 100; i++) {
vertx.setPeriodic(1, num -> {
eb.send("clusteredEndpoint", "ping");
});
}
当计时器数量较少时,它可以正常工作,但在大约 100 个计时器时,我会收到此错误。
我想知道如何在不阻塞的情况下扩展到 100K 事件/秒(作为参考,我编写了一个可能超过这个数字的 Vertx WebSocket 测试)。
如果不可能,我想了解什么是阻塞 - 看起来它是这个类中的东西:https ://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx /core/eventbus/impl/clustered/Serializer.java
供参考 - 此代码不会阻塞 - 即使有 1000 个计时器:
HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
for (int i = 0; i < 1000; i++) {
vertx.setPeriodic(1, num -> {
res.result().writeTextMessage("ping");
});
}
});
});
2020 年 12 月 15 日上午 10:54:38 io.vertx.core.impl.BlockedThreadChecker 警告:线程 Thread[vert.x-eventloop-thread-1,5,main] 已被阻止 36794 毫秒,时间限制为 2000 毫秒io.vertx.core.VertxException:线程在 io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java: 23) 在 io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) 在 io.vertx.core 的 io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133) .spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) 在 io.vertx.core.spi.cluster.impl.DefaultNodeSelector.selectForSend(DefaultNodeSelector.java:42) 在 io.vertx.core.eventbus .impl.clustered.ClusteredEventBus$$Lambda$1065/195695453.accept(未知来源)在 io.vertx.core.eventbus.impl.clustered。Serializer$SerializerQueue$SerializedTask.process(Serializer.java:147) 在 io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.checkPending(Serializer.java:94) 在 io.vertx.core.eventbus.impl。 clustered.Serializer$SerializerQueue.add(Serializer.java:114) 在 io.vertx.core.eventbus.impl.clustered.Serializer.queue(Serializer.java:65) 在 io.vertx.core.eventbus.impl.clustered。 ClusteredEventBus.sendOrPub(ClusteredEventBus.java:172) 在 io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:127) 在 io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java: 394) 在 io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:103) 在 io.vertx.core .eventbus.impl.EventBusImpl.send(EventBusImpl.java:97) 在 io.vertx。example.EBtestClient.lambda$start$0(EBtestClient.java:22) at io.vertx.example.EBtestClient$$Lambda$1056/1487417027.handle(Unknown Source) at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle( VertxImpl.java:939) 在 io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:910) 在 io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:52) 在 io.vertx .core.impl.ContextImpl.emit(ContextImpl.java:294) 在 io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) 在 io.vertx.core.impl.AbstractContext.emit(AbstractContext.java :49) 在 io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:933) 在 io.netty.util 的 io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24)。 concurrent.PromiseTask.runTask(PromiseTask.java:98) 在 io.netty.util.concurrent.ScheduledFutureTask。在 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) 在 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) 在 io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) 运行(ScheduledFutureTask.java:176) .channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap .java:74) 在 io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:748)在io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 在io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在io 运行(NioEventLoop.java:500) .netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:748)在io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 在io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 在io 运行(NioEventLoop.java:500) .netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 在 java.lang.Thread.run(Thread.java:748)