确保特定进程在网格中只运行一次的建议方法是什么?
用例将是从远程源订阅单个多路复用流,以便跨网格更新数据。我们需要选择要订阅的节点,并在该节点失败时选择一个新的节点来订阅。
在 gridgain 中是否有任何预先构建的模式,或者它是否通过监听网格生命周期事件和分布式 CAS 操作的组合来解决?
另一个用例是必须永远运行的单例作业,在失败时迁移到新节点。
谢谢。
确保特定进程在网格中只运行一次的建议方法是什么?
用例将是从远程源订阅单个多路复用流,以便跨网格更新数据。我们需要选择要订阅的节点,并在该节点失败时选择一个新的节点来订阅。
在 gridgain 中是否有任何预先构建的模式,或者它是否通过监听网格生命周期事件和分布式 CAS 操作的组合来解决?
另一个用例是必须永远运行的单例作业,在失败时迁移到新节点。
谢谢。
从 GridGain 6.2.0-rc2 版本开始,GridGain 有几种领导选举的方式:
GridProjection.oldest()
将为您返回集群中最旧节点的动态投影。如果最旧的节点出于某种原因离开集群,则会自动选择下一个最旧的节点,因此用户可以继续使用最旧的节点而不会中断。cluster-singleton-service
、per-node-singleton-service
或per-cache-key-singleton-service
。这本身不是领导者选举,但它可能完全不需要领导者选举,例如,GridGain 将保证cluster-singleton-service
在任何时候网格中只有一个可用的服务实例。有关分布式服务的更多信息,请参阅分布式服务文档。
请检查下面的代码。我使用 GridGain 6.1.8 对其进行了测试。
import org.gridgain.grid.*;
import org.gridgain.grid.cache.GridCache;
import org.gridgain.grid.cache.GridCacheConfiguration;
import org.gridgain.grid.cache.GridCacheMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
public class GridGainMain {
public static void main(String[] args) throws GridException {
GridConfiguration config = new GridConfiguration();
// Give a name to your grid.
config.setGridName("MyCoolGrid");
// Configure the cache that will be used by the leader election algorithm.
GridCacheConfiguration leaderConf = new GridCacheConfiguration();
leaderConf.setName("leader");
leaderConf.setCacheMode(GridCacheMode.REPLICATED);
config.setCacheConfiguration(leaderConf);
// Start the grid!
try (Grid grid = GridGain.start(config)) {
// Get the local node.
final GridNode localNode = grid.localNode();
// Get the leader cache.
final GridCache<String, String> leaderCache = grid.cache("leader");
// Elect this member as the leader, if no other node was elected yet.
putIfAbsent("leader", localNode.id().toString(), leaderCache);
// ================================================
// Schedule the leader election algorithm.
// The leader election algorithm will elect the oldest grid node as the leader.
// ================================================
new Timer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
// Get the self ID.
final String selfId = localNode.id().toString();
// Get the cached leader ID.
final String cachedLeaderId = get("leader", leaderCache);
// Get all nodes.
List<GridNode> list = new ArrayList<>(grid.nodes());
// Sort all nodes by natural order.
list.sort((o1, o2) -> (int) (o1.order() - o2.order()));
// Get the ID of the oldest node, which is the leader ID.
final String leaderId = list.get(0).id().toString();
// If the leader ID is not equals to the cached leader ID,
if (!leaderId.equals(cachedLeaderId)) {
// Put the leader ID into cache.
put("leader", leaderId, leaderCache);
}
// If this node is the leader,
if (selfId.equals(leaderId)) {
// =====================================
// Do something! Only this grid node will execute this code.
// =====================================
}
System.out.println("### Self ID: " + selfId
+ ", Order: " + localNode.order()
+ ", Leader ID: " + leaderId);
}
},
// Schedule now.
0L,
// Run the algorithm once every five seconds.
TimeUnit.SECONDS.toMillis(5));
// Remove this in production.
sleep(1, TimeUnit.DAYS);
}
}
private static <T> T get(String key, GridCache<String, T> cache) {
try {
return cache.get(key);
} catch (GridException e) {
return null;
}
}
private static <T> T putIfAbsent(String key, T value, GridCache<String, T> cache) {
try {
return cache.putIfAbsent(key, value);
} catch (GridException e) {
return null;
}
}
private static <T> T put(String key, T value, GridCache<String, T> cache) {
try {
return cache.put(key, value);
} catch (GridException e) {
return null;
}
}
public static void sleep(long duration, TimeUnit unit) {
try {
unit.sleep(duration);
} catch (InterruptedException e) {
// Ignore.
}
}
}
您可以简单地获取集群中最旧的节点并在该节点上开始操作(Grid.nodes() 将返回网格中的所有节点)。您还应该在其他节点上订阅发现事件侦听器,并让下一个最旧的节点接管,以防最旧的节点发生故障。
要检查一个节点是最旧的还是节点,您可以使用 GridNode.order() 方法。具有最小顺序的节点将是最旧的。
要收听发现事件,您可以使用以下代码:
grid.events().localListen(new GridPredicate<GridEvent>() {
@Override public boolean apply(GridEvent event) {
System.out.println("Event: " + event.name());
return true;
}
}, GridEventType.EVTS_DISCOVERY);