我正在尝试为集群协调事件实现以下行为:
- timer(事件)只在Payara Micro集群的一个线程\JVM中执行;
- 如果节点出现故障 - 计时器(事件)将在集群中的另一个节点上执行。
持久计时器不在 Payara Micro 集群中进行协调。它们总是在与创建计时器同名的实例上执行。
和
如果该实例出现故障,一旦加入集群,计时器将在另一个具有相同名称的实例上重新创建。在此之前,计时器变为非活动状态。
根据定义,似乎持久计时器在 Payara Micro 集群中无法正常工作。
因此,我正在尝试使用来自Hazelcast的 IScheduledExecutorService,这似乎是一个完美的匹配。
基本上,使用 IScheduledExecutorService 的实现效果很好,除了新的 Payara Micro 节点正在启动并加入集群(使用 IScheduledExecutorService 安排了一些事件的集群)的场景。在此期间,会发生以下异常:
异常 1: java.lang.RuntimeException:未初始化 ConcurrentRuntime
[2021-02-15T23:00:31.870+0800] [] [INFO] [] [fish.payara.nucleus.cluster.PayaraCluster] [tid: _ThreadID=63 _ThreadName=hz.angry_yalow.event-5] [timeMillis: 1613401231870] [levelValue: 800] [[
Data Grid Status
Payara Data Grid State: DG Version: 4 DG Name: testClusterDev DG Size: 2
Instances: {
DataGrid: testClusterDev Name: testNode0 Lite: false This: true UUID: 493b19ed-a58d-4508-b9ef-f5c58e05b859 Address: /10.41.0.7:6900
DataGrid: testClusterDev Lite: false This: false UUID: f12342bf-a37e-452a-8c67-1d36dd4dbac7 Address: /10.41.0.7:6901
}]]
[2021-02-15T23:00:32.290+0800] [] [WARNING] [] [com.hazelcast.internal.partition.operation.MigrationRequestOperation] [tid: _ThreadID=160 _ThreadName=ForkJoinPool.commonPool-worker-6] [timeMillis: 1613401232290] [levelValue: 900] [[
[10.41.0.7]:6900 [testClusterDev] [4.1] Failure while executing MigrationInfo{uuid=fc68e9ac-1081-4f9b-a70a-6fb0aae19016, partitionId=27, source=[10.41.0.7]:6900 - 493b19ed-a58d-4508-b9ef-f5c58e05b859, sourceCurrentReplicaIndex=0, sourceNewReplicaIndex=1, destination=[10.41.0.7]:6901 - f12342bf-a37e-452a-8c67-1d36dd4dbac7, destinationCurrentReplicaIndex=-1, destinationNewReplicaIndex=0, master=[10.41.0.7]:6900, initialPartitionVersion=1, partitionVersionIncrement=2, status=ACTIVE}
com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.RuntimeException: ConcurrentRuntime not initialized
at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:103)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:292)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.scheduledexecutor.impl.ScheduledRunnableAdapter.readData(ScheduledRunnableAdapter.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.scheduledexecutor.impl.TaskDefinition.readData(TaskDefinition.java:144)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.scheduledexecutor.impl.ScheduledTaskDescriptor.readData(ScheduledTaskDescriptor.java:208)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.scheduledexecutor.impl.operations.ReplicationOperation.readInternal(ReplicationOperation.java:87)
at com.hazelcast.spi.impl.operationservice.Operation.readData(Operation.java:750)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.internal.partition.ReplicaFragmentMigrationState.readData(ReplicaFragmentMigrationState.java:97)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
at com.hazelcast.internal.partition.operation.MigrationOperation.readInternal(MigrationOperation.java:249)
at com.hazelcast.spi.impl.operationservice.Operation.readData(Operation.java:750)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:205)
at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:346)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:437)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:166)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:136)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: java.lang.RuntimeException: ConcurrentRuntime not initialized
at org.glassfish.concurrent.runtime.ConcurrentRuntime.getRuntime(ConcurrentRuntime.java:121)
at org.glassfish.concurrent.runtime.InvocationContext.readObject(InvocationContext.java:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:83)
at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:76)
at fish.payara.nucleus.hazelcast.PayaraHazelcastSerializer.read(PayaraHazelcastSerializer.java:84)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
... 50 more
]]
[2021-02-15T23:00:32.304+0800] [] [WARNING] [] [com.hazelcast.internal.partition.impl.MigrationManager] [tid: _ThreadID=160 _ThreadName=ForkJoinPool.commonPool-worker-6] [timeMillis: 1613401232304] [levelValue: 900] [10.41.0.7]:6900 [testClusterDev] [4.1] Migration failed: MigrationInfo{uuid=fc68e9ac-1081-4f9b-a70a-6fb0aae19016, partitionId=27, source=[10.41.0.7]:6900 - 493b19ed-a58d-4508-b9ef-f5c58e05b859, sourceCurrentReplicaIndex=0, sourceNewReplicaIndex=1, destination=[10.41.0.7]:6901 - f12342bf-a37e-452a-8c67-1d36dd4dbac7, destinationCurrentReplicaIndex=-1, destinationNewReplicaIndex=0, master=[10.41.0.7]:6900, initialPartitionVersion=1, partitionVersionIncrement=2, status=ACTIVE}
这似乎是因为新节点没有完全初始化(因为它刚刚开始)。与下一个相比,此异常似乎不太重要。
异常 2: java.lang.NullPointerException:无法执行 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
[2021-02-15T23:44:19.544+0800] [] [SEVERE] [] [com.hazelcast.spi.impl.executionservice.ExecutionService] [tid: _ThreadID=35 _ThreadName=hz.elated_murdock.scheduled.thread-] [timeMillis: 1613403859544] [levelValue: 1000] [[
[10.4.0.7]:6901 [testClusterDev] [4.1] Failed to execute java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@55a27ce3
java.lang.NullPointerException
at org.glassfish.concurrent.runtime.ContextSetupProviderImpl.isApplicationEnabled(ContextSetupProviderImpl.java:326)
at org.glassfish.concurrent.runtime.ContextSetupProviderImpl.setup(ContextSetupProviderImpl.java:194)
at org.glassfish.enterprise.concurrent.internal.ContextProxyInvocationHandler.invoke(ContextProxyInvocationHandler.java:94)
at com.sun.proxy.$Proxy154.run(Unknown Source)
at com.hazelcast.scheduledexecutor.impl.ScheduledRunnableAdapter.call(ScheduledRunnableAdapter.java:56)
at com.hazelcast.scheduledexecutor.impl.TaskRunner.call(TaskRunner.java:78)
at com.hazelcast.scheduledexecutor.impl.TaskRunner.run(TaskRunner.java:104)
at com.hazelcast.spi.impl.executionservice.impl.DelegateAndSkipOnConcurrentExecutionDecorator$DelegateDecorator.run(DelegateAndSkipOnConcurrentExecutionDecorator.java:77)
at com.hazelcast.internal.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:217)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
]]
此异常发生在加入集群的新节点上。这并不总是发生,可能 Hazelcast 正在尝试在正在启动的新节点上执行事件,并且由于环境仍未完全初始化而失败。在两次这样的失败尝试后的问题 - 事件被 Hazelcast 卸载。
实施见解:
使用 IScheduledExecutorService 安排事件的方法(驻留在主应用程序 WAR 中的应用程序范围 bean 中):
@Resource
ContextService _ctxService;
public void sheduleClusteredEvent() {
IScheduledExecutorService executorService = _instance.getScheduledExecutorService("default");
ClusteredEvent ce = new ClusteredEvent(new DiagEvent(null, "TestEvent1"));
Object ceProxy = _ctxService.createContextualProxy(ce, Runnable.class, Serializable.class);
executorService.scheduleAtFixedRate((Runnable) ceProxy, 0, 3, TimeUnit.SECONDS);
}
ClusteredEvent 类(驻留在单独的 JAR 中并通过 --addLibs 参数添加到类路径到 Payara Micro)。它需要以某种方式通知主应用程序要触发的事件,因此使用 BeanManager.fireEvent()。
public class ClusteredEvent implements Runnable, Serializable {
private final DiagEvent _event;
public ClusteredEvent(DiagEvent event) {
_event = event;
}
@Override
public void run() {
// For sake of shortness - all check for nulls etc. were removed
((BeanManager) ic.lookup("java:comp/BeanManager")).fireEvent(_event);
}
}
所以我的问题:
- 如何解决上述异常/问题?
- 我在 Payara Micro 集群中实现协调的集群事件行为的方向是否正确?我希望这是一个开箱即用的简单任务,但它需要一些自定义实现,因为持久性计时器不能按预期工作。Payara Micro Cluster (>=v5.2021.1) 还有其他更优雅的方式来实现协调的集群事件行为吗?
非常感谢您!
更新1:
回想一下,本练习的主要目的是在 Payara 微集群中提供协调的计时器(事件)功能,因此非常欢迎有关更优雅解决方案的建议。
解决评论中的问题/建议:
Q1:
为什么需要为偶数对象创建上下文代理?
A1:确实从普通ClusteredEvent()
对象中制作上下文代理 - 在这里增加了主要复杂性并导致上面列出的异常(意思是:调度ClusteredEvent()
而不从中制作上下文代理 - 工作正常并且不会导致异常,但有一个警告)。
使用上下文代理的原因是我需要以某种方式从由IScheduledExecutorService
. 到目前为止,我还没有找到任何其他可行的方法来从非托管线程触发主应用程序中的任何 CDI/EJB bean。仅使其与上下文相关 - 允许ClusteredEvent.run()
通过BeanManger
例如与主应用程序进行通信。
任何关于如何在单独的应用程序中运行的非托管线程和 CDI/EJB bean 之间建立通信的建议(并且都在同一个 Payara Micro 实例上运行) - 欢迎。
Q2:
例如,您可以将 ceProxy 包装到 Runnable,它在 try catch 块中执行 ceProxy.run()
A2:我试过了,确实有助于处理上面提到的“异常2”。我在ClusteredEventWrapper
下面发布类的实现,run() 方法中的 try/catch 处理“异常 2”。
问题 3:
第一个例外来自 hazelcast 试图反序列化新实例上的代理,但失败是因为代理需要一个初始化的环境来反序列化。为了解决这个问题,您需要包装 ceProxy 对象并自定义包装器的反序列化以等待 ContextService 被初始化。
A3:为序列化/反序列化添加自定义实现ClusteredEventWrapper
确实允许处理“异常 1”,但在这里我仍在努力寻找处理它的最佳方式。通过 - 推迟反序列Thread.sleep()
化会导致新的(不同的)异常。抑制异常 - 需要检查,但在这种情况下,我担心ClusteredEventWrapper
不会在新(起始)节点上正确反序列化,因为 Hazelcast 会认为同步很好并且不会尝试再次同步它(我可能错了 - 这个我还需要检查)。目前看来,Hazelcast 尝试同步几次,但“异常 1”消失了。
ClusteredEventWrapper
which wraps的实现ClusteredEvent
:
public class ClusteredEventWrapper implements Runnable, Serializable {
private static final long serialVersionUID = 5878537035999797427L;
private static final Logger LOG = Logger.getLogger(ClusteredEventWrapper.class.getName());
private final Runnable _clusteredEvent;
public ClusteredEventWrapper(Runnable clusteredEvent) {
_clusteredEvent = clusteredEvent;
}
@Override
public void run() {
try {
_clusteredEvent.run();
} catch (Throwable e) {
if (e instanceof NullPointerException
&& e.getStackTrace() != null && e.getStackTrace().length > 0
&& "org.glassfish.concurrent.runtime.ContextSetupProviderImpl".equals(e.getStackTrace()[0].getClassName())
&& "isApplicationEnabled".equals(e.getStackTrace()[0].getMethodName())) {
// Means we got the "Exception 2" (posted above)
LOG.log(Level.WARNING, "Skipping scheduled event execution on this node as this node is still being initialized...");
} else {
LOG.log(Level.SEVERE, "Error executing scheduled event", e);
}
}
}
private void writeObject(ObjectOutputStream out) throws IOException {
LOG.log(Level.INFO, "1_WRITE_OBJECT...");
out.defaultWriteObject();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
LOG.log(Level.INFO, "2_READ_OBJECT...");
int retry = 0;
while (readObjectInner(in) != true && retry < 5) { // This doesn't work good, need to think of some other way on handling it
retry++;
LOG.log(Level.INFO, "2_READ_OBJECT: retry {0}", retry);
try {
// We need to wait
Thread.sleep(15000);
} catch (InterruptedException ex) {
}
}
}
private boolean readObjectInner(ObjectInputStream in) throws IOException, ClassNotFoundException {
try {
in.defaultReadObject();
return true;
} catch (Throwable e) {
if (e instanceof RuntimeException && "ConcurrentRuntime not initialized".equals(e.getMessage())) {
// This means node which is trying to desiarialize this objet is not ready yet
return false;
} else {
// For all other exceptions - we throw error
throw e;
}
}
}
}
所以现在活动安排如下:
@Resource
ContextService _ctxService;
public void sheduleClusteredEvent() {
IScheduledExecutorService executorService = _instance.getScheduledExecutorService("default");
ClusteredEvent ce = new ClusteredEvent(new DiagEvent(null, "PersistentEvent1"));
Object ceProxy = _ctxService.createContextualProxy(ce, Runnable.class, Serializable.class);
executorService.scheduleAtFixedRate(new ClusteredEventWrapper((Runnable) ceProxy), 0, 3, TimeUnit.SECONDS);
}