0

我正在尝试为集群协调事件实现以下行为:

  1. timer(事件)只在Payara Micro集群的一个线程\JVM中执行;
  2. 如果节点出现故障 - 计时器(事件)将在集群中的另一个节点上执行。

来自Payara Micro 指南

持久计时器不在 Payara Micro 集群中进行协调。它们总是在与创建计时器同名的实例上执行。

如果该实例出现故障,一旦加入集群,计时器将在另一个具有相同名称的实例上重新创建。在此之前,计时器变为非活动状态

根据定义,似乎持久计时器在 Payara Micro 集群中无法正常工作。

因此,我正在尝试使用来自Hazelcast的 IScheduledExecutorService,这似乎是一个完美的匹配。

基本上,使用 IScheduledExecutorService 的实现效果很好,除了新的 Payara Micro 节点正在启动并加入集群(使用 IScheduledExecutorService 安排了一些事件的集群)的场景。在此期间,会发生以下异常:

异常 1: java.lang.RuntimeException:未初始化 ConcurrentRuntime

[2021-02-15T23:00:31.870+0800] [] [INFO] [] [fish.payara.nucleus.cluster.PayaraCluster] [tid: _ThreadID=63 _ThreadName=hz.angry_yalow.event-5] [timeMillis: 1613401231870] [levelValue: 800] [[
  Data Grid Status 
Payara Data Grid State: DG Version: 4 DG Name: testClusterDev DG Size: 2
Instances: {
 DataGrid: testClusterDev Name: testNode0 Lite: false This: true UUID: 493b19ed-a58d-4508-b9ef-f5c58e05b859 Address: /10.41.0.7:6900
 DataGrid: testClusterDev Lite: false This: false UUID: f12342bf-a37e-452a-8c67-1d36dd4dbac7 Address: /10.41.0.7:6901
}]]

[2021-02-15T23:00:32.290+0800] [] [WARNING] [] [com.hazelcast.internal.partition.operation.MigrationRequestOperation] [tid: _ThreadID=160 _ThreadName=ForkJoinPool.commonPool-worker-6] [timeMillis: 1613401232290] [levelValue: 900] [[
  [10.41.0.7]:6900 [testClusterDev] [4.1] Failure while executing MigrationInfo{uuid=fc68e9ac-1081-4f9b-a70a-6fb0aae19016, partitionId=27, source=[10.41.0.7]:6900 - 493b19ed-a58d-4508-b9ef-f5c58e05b859, sourceCurrentReplicaIndex=0, sourceNewReplicaIndex=1, destination=[10.41.0.7]:6901 - f12342bf-a37e-452a-8c67-1d36dd4dbac7, destinationCurrentReplicaIndex=-1, destinationNewReplicaIndex=0, master=[10.41.0.7]:6900, initialPartitionVersion=1, partitionVersionIncrement=2, status=ACTIVE}
com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.RuntimeException: ConcurrentRuntime not initialized
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:103)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:292)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
    at com.hazelcast.scheduledexecutor.impl.ScheduledRunnableAdapter.readData(ScheduledRunnableAdapter.java:106)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
    at com.hazelcast.scheduledexecutor.impl.TaskDefinition.readData(TaskDefinition.java:144)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
    at com.hazelcast.scheduledexecutor.impl.ScheduledTaskDescriptor.readData(ScheduledTaskDescriptor.java:208)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
    at com.hazelcast.scheduledexecutor.impl.operations.ReplicationOperation.readInternal(ReplicationOperation.java:87)
    at com.hazelcast.spi.impl.operationservice.Operation.readData(Operation.java:750)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
    at com.hazelcast.internal.partition.ReplicaFragmentMigrationState.readData(ReplicaFragmentMigrationState.java:97)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:567)
    at com.hazelcast.internal.partition.operation.MigrationOperation.readInternal(MigrationOperation.java:249)
    at com.hazelcast.spi.impl.operationservice.Operation.readData(Operation.java:750)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.readInternal(DataSerializableSerializer.java:160)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:106)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.read(DataSerializableSerializer.java:51)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:205)
    at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:346)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:437)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:166)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:136)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
    at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: java.lang.RuntimeException: ConcurrentRuntime not initialized
    at org.glassfish.concurrent.runtime.ConcurrentRuntime.getRuntime(ConcurrentRuntime.java:121)
    at org.glassfish.concurrent.runtime.InvocationContext.readObject(InvocationContext.java:214)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:83)
    at com.hazelcast.internal.serialization.impl.defaultserializers.JavaDefaultSerializers$JavaSerializer.read(JavaDefaultSerializers.java:76)
    at fish.payara.nucleus.hazelcast.PayaraHazelcastSerializer.read(PayaraHazelcastSerializer.java:84)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:286)
    ... 50 more
]]

[2021-02-15T23:00:32.304+0800] [] [WARNING] [] [com.hazelcast.internal.partition.impl.MigrationManager] [tid: _ThreadID=160 _ThreadName=ForkJoinPool.commonPool-worker-6] [timeMillis: 1613401232304] [levelValue: 900] [10.41.0.7]:6900 [testClusterDev] [4.1] Migration failed: MigrationInfo{uuid=fc68e9ac-1081-4f9b-a70a-6fb0aae19016, partitionId=27, source=[10.41.0.7]:6900 - 493b19ed-a58d-4508-b9ef-f5c58e05b859, sourceCurrentReplicaIndex=0, sourceNewReplicaIndex=1, destination=[10.41.0.7]:6901 - f12342bf-a37e-452a-8c67-1d36dd4dbac7, destinationCurrentReplicaIndex=-1, destinationNewReplicaIndex=0, master=[10.41.0.7]:6900, initialPartitionVersion=1, partitionVersionIncrement=2, status=ACTIVE}

这似乎是因为新节点没有完全初始化(因为它刚刚开始)。与下一个相比,此异常似乎不太重要。

异常 2: java.lang.NullPointerException:无法执行 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask

[2021-02-15T23:44:19.544+0800] [] [SEVERE] [] [com.hazelcast.spi.impl.executionservice.ExecutionService] [tid: _ThreadID=35 _ThreadName=hz.elated_murdock.scheduled.thread-] [timeMillis: 1613403859544] [levelValue: 1000] [[
  [10.4.0.7]:6901 [testClusterDev] [4.1] Failed to execute java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@55a27ce3
java.lang.NullPointerException
        at org.glassfish.concurrent.runtime.ContextSetupProviderImpl.isApplicationEnabled(ContextSetupProviderImpl.java:326)
        at org.glassfish.concurrent.runtime.ContextSetupProviderImpl.setup(ContextSetupProviderImpl.java:194)
        at org.glassfish.enterprise.concurrent.internal.ContextProxyInvocationHandler.invoke(ContextProxyInvocationHandler.java:94)
        at com.sun.proxy.$Proxy154.run(Unknown Source)
        at com.hazelcast.scheduledexecutor.impl.ScheduledRunnableAdapter.call(ScheduledRunnableAdapter.java:56)
        at com.hazelcast.scheduledexecutor.impl.TaskRunner.call(TaskRunner.java:78)
        at com.hazelcast.scheduledexecutor.impl.TaskRunner.run(TaskRunner.java:104)
        at com.hazelcast.spi.impl.executionservice.impl.DelegateAndSkipOnConcurrentExecutionDecorator$DelegateDecorator.run(DelegateAndSkipOnConcurrentExecutionDecorator.java:77)
        at com.hazelcast.internal.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:217)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76)
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
]]

此异常发生在加入集群的新节点上。这并不总是发生,可能 Hazelcast 正在尝试在正在启动的新节点上执行事件,并且由于环境仍未完全初始化而失败。在两次这样的失败尝试后的问题 - 事件被 Hazelcast 卸载。


实施见解:

使用 IScheduledExecutorService 安排事件的方法(驻留在主应用程序 WAR 中的应用程序范围 bean 中):

@Resource
ContextService _ctxService;

public void sheduleClusteredEvent() {
    IScheduledExecutorService executorService = _instance.getScheduledExecutorService("default");
    ClusteredEvent ce = new ClusteredEvent(new DiagEvent(null, "TestEvent1"));
    Object ceProxy = _ctxService.createContextualProxy(ce, Runnable.class, Serializable.class);
    executorService.scheduleAtFixedRate((Runnable) ceProxy, 0, 3, TimeUnit.SECONDS);
}

ClusteredEvent 类(驻留在单独的 JAR 中并通过 --addLibs 参数添加到类路径到 Payara Micro)。它需要以某种方式通知主应用程序要触发的事件,因此使用 BeanManager.fireEvent()。

public class ClusteredEvent implements Runnable, Serializable {
    private final DiagEvent _event;
    public ClusteredEvent(DiagEvent event) {
        _event = event;
    }
    @Override
    public void run() {
        // For sake of shortness - all check for nulls etc. were removed
        ((BeanManager) ic.lookup("java:comp/BeanManager")).fireEvent(_event);
    }
}

所以我的问题:

  1. 如何解决上述异常/问题?
  2. 我在 Payara Micro 集群中实现协调的集群事件行为的方向是否正确?我希望这是一个开箱即用的简单任务,但它需要一些自定义实现,因为持久性计时器不能按预期工作。Payara Micro Cluster (>=v5.2021.1) 还有其他更优雅的方式来实现协调的集群事件行为吗?

非常感谢您!


更新1:

回想一下,本练习的主要目的是在 Payara 微集群中提供协调的计时器(事件)功能,因此非常欢迎有关更优雅解决方案的建议。

解决评论中的问题/建议:

Q1:

为什么需要为偶数对象创建上下文代理?

A1:确实从普通ClusteredEvent()对象中制作上下文代理 - 在这里增加了主要复杂性并导致上面列出的异常(意思是:调度ClusteredEvent()而不从中制作上下文代理 - 工作正常并且不会导致异常,但有一个警告)。

使用上下文代理的原因是我需要以某种方式从由IScheduledExecutorService. 到目前为止,我还没有找到任何其他可行的方法来从非托管线程触发主应用程序中的任何 CDI/EJB bean。仅使其与上下文相关 - 允许ClusteredEvent.run()通过BeanManger例如与主应用程序进行通信。

任何关于如何在单独的应用程序中运行的非托管线程和 CDI/EJB bean 之间建立通信的建议(并且都在同一个 Payara Micro 实例上运行) - 欢迎。

Q2:

例如,您可以将 ceProxy 包装到 Runnable,它在 try catch 块中执行 ceProxy.run()

A2:我试过了,确实有助于处理上面提到的“异常2”。我在ClusteredEventWrapper下面发布类的实现,run() 方法中的 try/catch 处理“异常 2”。

问题 3:

第一个例外来自 hazelcast 试图反序列化新实例上的代理,但失败是因为代理需要一个初始化的环境来反序列化。为了解决这个问题,您需要包装 ceProxy 对象并自定义包装器的反序列化以等待 ContextService 被初始化。

A3:为序列化/反序列化添加自定义实现ClusteredEventWrapper确实允许处理“异常 1”,但在这里我仍在努力寻找处理它的最佳方式。通过 - 推迟反序列Thread.sleep()化会导致新的(不同的)异常。抑制异常 - 需要检查,但在这种情况下,我担心ClusteredEventWrapper不会在新(起始)节点上正确反序列化,因为 Hazelcast 会认为同步很好并且不会尝试再次同步它(我可能错了 - 这个我还需要检查)。目前看来,Hazelcast 尝试同步几次,但“异常 1”消失了。

ClusteredEventWrapperwhich wraps的实现ClusteredEvent

public class ClusteredEventWrapper implements Runnable, Serializable {

    private static final long serialVersionUID = 5878537035999797427L;
    private static final Logger LOG = Logger.getLogger(ClusteredEventWrapper.class.getName());

    private final Runnable _clusteredEvent;

    public ClusteredEventWrapper(Runnable clusteredEvent) {
        _clusteredEvent = clusteredEvent;
    }

    @Override
    public void run() {
        try {
            _clusteredEvent.run();
        } catch (Throwable e) {
            if (e instanceof NullPointerException
                    && e.getStackTrace() != null && e.getStackTrace().length > 0
                    && "org.glassfish.concurrent.runtime.ContextSetupProviderImpl".equals(e.getStackTrace()[0].getClassName())
                    && "isApplicationEnabled".equals(e.getStackTrace()[0].getMethodName())) {
                // Means we got the "Exception 2" (posted above)
                LOG.log(Level.WARNING, "Skipping scheduled event execution on this node as this node is still being initialized...");
            } else {
                LOG.log(Level.SEVERE, "Error executing scheduled event", e);
            }
        }
    }


    private void writeObject(ObjectOutputStream out) throws IOException {
        LOG.log(Level.INFO, "1_WRITE_OBJECT...");
        out.defaultWriteObject();
    }


    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        LOG.log(Level.INFO, "2_READ_OBJECT...");
        int retry = 0;
        while (readObjectInner(in) != true && retry < 5) { // This doesn't work good, need to think of some other way on handling it
            retry++;
            LOG.log(Level.INFO, "2_READ_OBJECT: retry {0}", retry);
            try {
                // We need to wait 
                Thread.sleep(15000);
            } catch (InterruptedException ex) {
            }
        }
    }


    private boolean readObjectInner(ObjectInputStream in) throws IOException, ClassNotFoundException {
        try {
            in.defaultReadObject();
            return true;
        } catch (Throwable e) {
            if (e instanceof RuntimeException && "ConcurrentRuntime not initialized".equals(e.getMessage())) {
                // This means node which is trying to desiarialize this objet is not ready yet
                return false;
            } else {
                // For all other exceptions - we throw error
                throw e;
            }
        }
    }

}

所以现在活动安排如下:

@Resource
ContextService _ctxService;

public void sheduleClusteredEvent() {
    IScheduledExecutorService executorService = _instance.getScheduledExecutorService("default");
    ClusteredEvent ce = new ClusteredEvent(new DiagEvent(null, "PersistentEvent1"));
    Object ceProxy = _ctxService.createContextualProxy(ce, Runnable.class, Serializable.class);
    executorService.scheduleAtFixedRate(new ClusteredEventWrapper((Runnable) ceProxy), 0, 3, TimeUnit.SECONDS);
}
4

1 回答 1

0

下面我根据@OndroMih在评论中的建议发布实施的解决方案:

摘录1:

...更好的方法是避免将对象包装到上下文中,而是BeanManager在应用程序启动时注册到全局变量(单例)中。您可以从ClusteredEvent.run()静态方法中检索它,例如Registry.getBeanManager(). 此方法必须等到应用程序启动并保存其BeanManager实例Registry.setBeanManager()

和这个:

摘录2:

或者,如果您存储对 ManagedExecutorService而不是 的引用,BeanManager使用该执行程序执行 run 方法并注入您需要的任何内容,可能会更好。

@OndroMih,请将它们作为回复发布-我会将其标记为已接受的答案!


在详细介绍实现之前 - 关于我们的应用程序打包的几句话:它包括:

  1. war文件作为 Uber jar 捆绑到 Payara Micro 中,因此我们不会重新部署应用程序战争,我们会启动和停止整个 Payara Micro,并在其上部署战争;
  2. jar带有少量类的小型库,主要用于 Hazelcast,并通过--addLibsarg 提供给 Payara Micro Uber jar,以避免在 Hazelcast 在 DataGrid 中同步对象时出现 ClassNotFoundExceptions。

现在关于实现,它为我们提供了集群定时器/事件所需的行为(参见第 1 篇文章):

I)ManagedExecutorService按照上面的建议使用确实看起来更灵活,因为它允许将任何所需的对象注入到集群事件中,所以我从这种方法开始。但由于某种原因 - 我无法注射任何东西。由于时间有限,我将其留待将来进行调查并切换到下一种方法。我还在本文末尾提供了此案例的示例代码。

II)所以我切换到BeanManager.

  1. 我得到了Registry如下实现的signleton(为了简短起见,所有评论都被删除了)。此类位于通过--addLibsarg 添加到 Payara Micro 的小 jar 中:
public final class Registry {

    private ManagedExecutorService _executorService;

    private BeanManager _beanManager;

    private Registry() {
    }

    public ManagedExecutorService getExecutorService() {
        return _executorService;
    }

    public void setExecutorService(ManagedExecutorService executorService) {
        _executorService = executorService;
    }

    public BeanManager getBeanManager() {
        return _beanManager;
    }

    public void setBeanManager(BeanManager beanManager) {
        _beanManager = beanManager;
    }

    public static Registry getInstance() {
        return InstanceHolder._instance;
    }

    private static class InstanceHolder {
        private static final Registry _instance = new Registry();
    }
}
  1. 在主应用程序战争中,我们已经有一个 AppListener 类,它在部署应用程序时监听事件,因此我们在其中添加了Registry填充逻辑:
public class AppListener implements SystemEventListener {

    ...

    @Resource
    private ManagedExecutorService _managedExecutorService;

    @Resource
    private BeanManager _beanManager;

    @Override
    public void processEvent(SystemEvent event) throws AbortProcessingException {
        try {

            if (event instanceof PostConstructApplicationEvent) {
                LOG.log(Level.INFO, ">> Application started");
                ...
                // Once app marked as started - populate global objects in the Registry
                Registry.getInstance().setExecutorService(_managedExecutorService);
                Registry.getInstance().setBeanManager(_beanManager);
            }
            
            ...
            
        } catch (Exception e) {
            LOG.log(Level.SEVERE, ">> Error processing event: " + event, e);
        }
    }
}
  1. ClusteredEvent按计划通过的类IScheduledExecutorService.scheduleAtFixedRate()也驻留在小 jar 中,并具有以下实现:
public final class ClusteredEvent implements NamedTask, Runnable, Serializable {

    ...

    private final MultiTenantEvent _event;

    public ClusteredEvent(MultiTenantEvent event) {
        if (event == null) {
            throw new NullPointerException("Event can not be null");
        }
        _event = event;
    }

    @Override
    public void run() {
        try {
            if (Registry.getInstance().getBeanManager() == null) {
                LOG.log(Level.WARNING, "Skipping timer execution - application not initialized yet...");
                return;
            }
            Registry.getInstance().getBeanManager().fireEvent(_event);
        } catch (Throwable e) {
            LOG.log(Level.SEVERE, "Error executing timer: " + _event, e);
        }
    }

    @Override
    public final String getName() {
        return _event.getName();
    }
}
  1. 基本上就是这样。使用以下简单步骤完成调度:
@Resource(lookup = "payara/Hazelcast")
private HazelcastInstance _instance;

_instance.getScheduledExecutorService("default").scheduleAtFixedRate(new ClusteredEvent(event), initialDelaySec, invocationPeriodSec, TimeUnit.SECONDS);

到目前为止,所有测试都很顺利。我担心Registry.getBeanManager()由于某处的某些封闭上下文(我不确定 BeanManager 引用的性质),一段时间后会被“宠坏”,但测试表明 BeanManager 的引用在 1 天后仍然有效,所以希望它会起作用美好的。

另一个问题(即使不是问题,但需要考虑的警告)是无法控制哪个节点事件将由 触发IScheduledExecutorService,例如,当在尚未初始化(仍在启动)的节点上触发事件时 -事件被跳过。但是对于我们的使用场景,这是可以接受的,所以目前我们可以接受这些考虑。


ManagedExecutorService回到使用:的问题,ClusteredEvent如下所示:

public class ClusteredEvent implements Runnable, Serializable {

    private final MultiTenantEvent _event;

    public ClusteredEvent(MultiTenantEvent event) {
        _event = event;
    }

    @Override
    public void run() {
        try {
            LOG.log(Level.INFO, "TIMER THREAD NAME: {0}", Thread.currentThread().getName());

            if (Registry.getInstance().getExecutorService() == null) {
                LOG.log(Level.WARNING, "Skipping timer execution - application not initialized yet...");
                return;
            }
            
            Registry.getInstance().getExecutorService().submit(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    LOG.log(Level.INFO, "Timer.Run() THREAD NAME: {0}", Thread.currentThread().getName());                    
                    String beanManagerJndiName = "java:comp/BeanManager";
                    try {
                        Context ic = new InitialContext();
                        BeanManager beanManager = (BeanManager) ic.lookup(beanManagerJndiName);
                        beanManager.fireEvent(_event);
                        return true;
                    } catch (NullPointerException | NamingException ex) {
                        LOG.log(Level.SEVERE, "ERROR: no BeanManager resource could be located by JNDI name: " + beanManagerJndiName, ex);
                        return false;
                    }
                }
            }).get();
        } catch (Throwable e) {
            LOG.log(Level.SEVERE, "Error executing timer: " + _event, e);
        }
    }
}

输出如下:

[2021-02-24 07:56:07] [INFO] [ua.appName.model.event.ClusteredEvent run]
 TIMER THREAD NAME: hz.competent_mccarthy.cached.thread-11

[2021-02-24 07:56:07] [INFO] [ua.appName.model.event.ClusteredEvent$1 call]
 Timer.Run() THREAD NAME: concurrent/__defaultManagedExecutorService-managedThreadFactory-Thread-1

[2021-02-24 07:56:07] [SEVERE] [ua.appName.model.event.ClusteredEvent$1 call]
 ERROR: no BeanManager resource could be located by JNDI name: java:comp/BeanManager
javax.naming.NamingException: Lookup failed for 'java:comp/BeanManager' in SerialContext[myEnv={java.naming.factory.initial=com.sun.enterprise.naming.impl.SerialInitContextFactory, java.naming.factory.url.pkgs=com.sun.enterprise.naming, java.naming.factory.state=com.sun.corba.ee.impl.presentation.rmi.JNDIStateFactoryImpl} [Root exception is javax.naming.NamingException: Invocation exception: Got null ComponentInvocation ]
    at com.sun.enterprise.naming.impl.SerialContext.lookup(SerialContext.java:496)
    at com.sun.enterprise.naming.impl.SerialContext.lookup(SerialContext.java:442)
    at javax.naming.InitialContext.lookup(InitialContext.java:417)
    at javax.naming.InitialContext.lookup(InitialContext.java:417)
    at ua.appName.model.event.ClusteredEvent$1.call(ClusteredEvent.java:70)
    at ua.appName.model.event.ClusteredEvent$1.call(ClusteredEvent.java:63)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.glassfish.enterprise.concurrent.internal.ManagedFutureTask.run(ManagedFutureTask.java:143)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    at org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl$ManagedThread.run(ManagedThreadFactoryImpl.java:250)
Caused by: javax.naming.NamingException: Invocation exception: Got null ComponentInvocation 
    at com.sun.enterprise.naming.impl.GlassfishNamingManagerImpl.getComponentId(GlassfishNamingManagerImpl.java:870)
    at com.sun.enterprise.naming.impl.GlassfishNamingManagerImpl.lookup(GlassfishNamingManagerImpl.java:737)
    at com.sun.enterprise.naming.impl.JavaURLContext.lookup(JavaURLContext.java:167)
    at com.sun.enterprise.naming.impl.SerialContext.lookup(SerialContext.java:476)
    ... 11 more

所以行Timer.Run() THREAD NAME: concurrent/__defaultManagedExecutorService-managedThreadFactory-Thread-1确认代码已经在托管线程内运行,但我仍然无法注入或查找任何内容。这次我把这个调查留到了以后。


再次感谢@OndroMih对实施的建议!

谢谢!

于 2021-03-04T02:07:16.490 回答