0

我正在尝试使用 JGroups 创建一个领导者选举协议,以便我的程序的 N 个实例可以选择一个主节点,并且所有客户端都可以获得该主节点的 ip。当前的实现或多或少依赖于每个实例尝试获取锁通道上的锁,当它成功获取该通道时,它成为主通道,所有其他实例都切换到客户端。

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.*;
import org.jgroups.*;
import org.jgroups.blocks.locking.LockService;

public class AutoDiscovery
{
    static org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AutoDiscovery.class); //used for logging purposes (see log4j library)
    /* this variable indicates whether I have become the master or I'm just a client*/
    public volatile AtomicBoolean becomeMaster = new AtomicBoolean(false);
    /* The address of the server if we are a client or of ourself if we are
     * server */
    public String serverAddress;
    /* A channel on which to acquire a lock, so that only one can become server */
    private JChannel lockChannel;
    /* A shared channel ffor communication between client and master*/
    private JChannel communicationChannel;
    private LockService lockService;
    /* A thread which tries to acquire a lock */
    private Thread acquiringThread;
    /* A thread which listens for the server ip which may change */
    private Thread listeningThread;
    /* A thread which lists the status and initializes the acquiring thread*/
    private Thread statusThread;
    private String name;
    /* If we pass from being a client to being a server we must stop the listening
     * thread however we cannot call listeningThread.stop() but instead we change
     * the stopListening boolean to true */
    private boolean stopListening = false;
    /* This lock communicates I have finally become either master or client so
     * the serverAddress and becomeMaster variables are correctly set */
    public final Object finishedLock = new Object();

    public static void main(String[] args) throws Exception
    {
        Thread.currentThread().setName("MyMainThread");
        Random rand = new Random();

        AutoDiscovery master = new AutoDiscovery("Node" + rand.nextInt(10));

        master.lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
        master.lockChannel.connect("lock-channel");

        master.communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
        master.communicationChannel.connect("communication-channel");

        master.lockService = new LockService(master.lockChannel);
        master.startStatusPrinterThread();
    }

    public AutoDiscovery(String name)
    {
        this.name = name;
    }

    public AutoDiscovery()
    {
        try
        {
            Thread.currentThread().setName("MyMainThread");
            Random rand = new Random();

            this.name = ("Node" + rand.nextInt(10));

            lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
            lockChannel.connect("lock-channel");

            communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
            communicationChannel.connect("communication-channel");

            lockService = new LockService(lockChannel);
            startStatusPrinterThread();
        }
        catch (Exception ex)
        {
            Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void startAcquiringThread()
    {
        acquiringThread = new Thread()
        {
            @Override
            public void run()
            {
                while (true)
                {
                    //if you have become Master send your ip every now and then
                    if (becomeMaster.get())
                    {
                        try
                        {
                            communicationChannel.send(new Message(null, null, "serverip " + serverAddress));
                        }
                        catch (Exception ex)
                        {
                            Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
                        }
                    }
                    else
                    {
                        try
                        {
                            Thread.currentThread().setName(name + "AcquiringThread");
                            Lock lock = lockService.getLock("serverLock");
                            if (lock.tryLock(4, TimeUnit.SECONDS))
                            {
                                becomeMaster.set(true);
                                stopListening = true;
                                /* Now that I'm server I must find out my own ip address on which to listen */
                                Enumeration<NetworkInterface> networkInterfaces;
                                try
                                {
                                    networkInterfaces = NetworkInterface.getNetworkInterfaces();
                                    for (NetworkInterface netint : Collections.list(networkInterfaces))
                                    {
                                        Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
                                        for (InetAddress inetAddress : Collections.list(inetAddresses))
                                        {
                                            if (isIPAddress(inetAddress.getHostAddress())
                                                    && !inetAddress.getHostAddress().equals("127.0.0.1"))
                                            {
                                                serverAddress = inetAddress.getHostAddress();
                                            }
                                        }
                                    }
                                    /* I notify to the rest of the program I have correctly initialized 
                                     * becomeMaster and serverAddress */
                                    synchronized (finishedLock)
                                    {
                                        finishedLock.notify();
                                    }
                                }
                                catch (Exception e)
                                {
                                    Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, e);
                                    System.exit(0);
                                }
                                log.info(Thread.currentThread().getName() + ": I acquired lock! will become master! my ip is " + serverAddress);
                            }
                            else
                            {
                                becomeMaster.set(false);
                                stopListening = false;
                                if (listeningThread == null || !listeningThread.isAlive())
                                {
                                    if (!stopListening) //??? this codnition might be useless
                                    {
                                        startListeningThread();
                                    }
                                }
                            }
                        }
                        catch (Exception e)
                        {
                            e.printStackTrace();
                        }
                    }
                    try
                    {
                        sleep(5000L);
                    }
                    catch (InterruptedException ex)
                    {
                        Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        };
        acquiringThread.setDaemon(true);
        acquiringThread.start();
    }

    public void startListeningThread()
    {
        listeningThread = new Thread()
        {
            @Override
            public void run()
            {
                try
                {
                    while (true)
                    {
                        Thread.currentThread().setName(name + "ListeningThread");
                        communicationChannel.setReceiver(new ReceiverAdapter()
                        {
                            @Override
                            public void receive(Message msg)
                            {
                                if (msg.getObject() != null)
                                {
                                    String leaderServerAddress = (msg.getObject().toString().substring(9));
                                    if (isIPAddress(leaderServerAddress))
                                    {
                                        serverAddress = leaderServerAddress;
                                        log.info(name + " Master server has ip" + serverAddress);
                                        /* I notify to the rest of the program I have correctly initialized 
                                         * becomeMaster and serverAddress */
                                        synchronized (finishedLock)
                                        {
                                            finishedLock.notify();
                                        }
                                    }
                                    else
                                    {
                                        log.info(name + ": discarded message " + msg.getObject().toString());
                                    }
                                }
                            }
                        });
                        sleep(10000L);
                        if (stopListening)
                        {
                            return;
                        }
                    }
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
            }
        };
        listeningThread.setDaemon(true);
        listeningThread.start();
    }

    private void startStatusPrinterThread()
    {
        statusThread = new Thread()
        {
            @Override
            public void run()
            {
                Thread.currentThread().setName(name + "StatusPrinterThread");
                startAcquiringThread();
                while (true)
                {
                    try
                    {
                        if (becomeMaster.get())
                        {
                            log.info(name + " startStatusPrinterThread(): I am happily a Master!");
                        }
                        else
                        {
                            if (!acquiringThread.isAlive())
                            {
                                startAcquiringThread();
                            }
                        }
                        sleep(5000L);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        };
        statusThread.setDaemon(true);
        statusThread.start();
    }

    private static boolean isIPAddress(String str)
    {
        Pattern ipPattern = Pattern.compile("^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
        return ipPattern.matcher(str).matches();
    }
}

现在我当前的 udp.xml 是

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
    <UDP
        mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20M"
         ucast_send_buf_size="640K"
         mcast_recv_buf_size="25M"
         mcast_send_buf_size="640K"
         loopback="true"
         level="WARN"
         log_discard_msgs="false"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:8}"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         timer_type="new"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000"
            num_initial_members="3"/>
    <MERGE2 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK exponential_backoff="300"
                   xmit_stagger_timeout="200"
                   use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                view_bundling="true"/>
    <UFC max_credits="2M"
         min_threshold="0.4"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <pbcast.STATE_TRANSFER />
    <CENTRAL_LOCK />
    <!-- pbcast.FLUSH  /-->
</config> 

现在,当我在同一台机器上运行程序的 N 个实例(N-1 个成员成为客户端,1 个成为主控)时,上述方法有效。当在连接到同一个 LAN 的两台不同机器上运行时,显然在每个成员中使用相同的集群名称调用 JChannel.connect() 之后,每个成员都会创建自己的通道,并且不会创建公共集群。结果是,当向客户端发送消息时,另一个主服务器看到相同集群名称的不同物理地址,并且所有消息都被丢弃。

所以我收到如下警告:

7683 [Incoming-1,communication-channel,pc-home-41714] WARN org.jgroups.protocols.pbcast.NAKACK  - [JGRP00011] pc-home-41714: dropped message 293 from non-member cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d (view=[pc-home-41714|0] [pc-home-41714])

1207996 [TransferQueueBundler,communication-channel,pc-home-5280] WARN org.jgroups.protocols.UDP  - pc-home-5280: no physical address for cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d, dropping message
1209526 [TransferQueueBundler,lock-channel,pc-home-59082] WARN org.jgroups.protocols.UDP  - pc-home-59082: no physical address for efbe6408-0e21-d119-e2b8-f1d5762d9b45, dropping message

如果我将 udp.xml loopback="true" 更改为 loopback="false" 会发生什么,它们都连接到同一个集群,但是它们会给出如下错误:

55539 [Node0StatusPrinterThread] INFO plarz.net.planningig.autodiscovery.AutoDiscovery  - Node0 startStatusPrinterThread(): I am happily a Master!
59077 [TransferQueueBundler,lock-channel,pc-test-6919] ERROR org.jgroups.protocols.UDP  - pc-test-6919: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:43109 (130 bytes):, cause: java.io.IOException: Network is unreachable
59505 [TransferQueueBundler,communication-channel,pc-test-35303] ERROR org.jgroups.protocols.UDP  - pc-test-35303: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:55053 (139 bytes):, cause: java.io.IOException: Network is unreachable
4

1 回答 1

0

错误:

[Server:ha-server-3] 13:59:13,122 WARNING [org.jgroups.protocols.UDP] (OOB-15,null) null: 766de5c9-8ac2-6d30-89ef-78d39aa5f7eb 没有物理地址,丢弃消息

在我的情况下,这是由于在同一个网络中有多个 jboss 集群,并且每个集群都具有相同的名称。例如 ha-server-1 和 ha-server-2 存在于不同机器的两个不同集群中。

集群 1(10.10.10.10):| +- ha-server-1 +- ha-server-2

集群 2(10.10.10.20):| +- ha-server-1 +- ha-server-2

我已经通过更改 ha-server 名称解决了这个问题。注意:两者都是独立的集群。我认为这是由于 JGroups 的多播问题而发生的。像您这样的专家的任何进一步解释都会很好。

参考http://icfun.blogspot.com/2013/10/no-physical-address-for-766de5c9-8ac2.html

于 2014-05-14T11:15:59.903 回答