0

我在 AUTO_SERVER 模式下成功使用 H2 数据库,以便在网络上的多个桌面客户端之间透明地共享数据库文件。这样,在客户端和从 tcp 服务器读取的所有其他客户端中选出了一个服务器。

我缺少的是客户端或服务器如何通知所有其他桌面客户端数据库中的某些内容已更改。现在,我正在使用 JGroups 通道让所有客户端相互通信,但这是另一个故障点,也是另一个与 H2 并行运行的领导者选举算法。

难道没有其他方法了吗?我已经阅读了一些数据库支持的 JMS(Java 消息服务 Java API)。H2有什么提示吗?

谢谢

编辑

以下代码是对当前答案的改编,如果我首先启动 Sender(将 args 设置为“sender”),他作为服务器连接到 H2 数据库,然后我在远程机器中执行 Receiver(将 args 设置为“receiver”)和他们作为客户连接。

然而只有服务器接收到通知,客户端什么也接收不到。

从我目前所知道的情况来看,这是有道理的:仅在服务器上调用触发器,在客户端或服务器上调用从客户端或服务器调用的用户定义函数,但不会跨连接到数据库的所有客户端(和服务器)调用。

那么有没有办法调整以下内容以通知所有连接的实例数据库中的更改?

import java.io.File;
import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.tools.TriggerAdapter;

public class TestSimpleDB2
{

    public static void main(String[] args) throws Exception
    {
        //final String url = "jdbc:h2:mem:test;multi_threaded=true";
        final String url = "jdbc:h2:" + File.separator + "mnt/testdir/PlanIGS" + File.separator
                + "persondb;create=true;AUTO_SERVER=TRUE;multi_threaded=true";
        Connection conn = DriverManager.getConnection(url);
        Statement stat = conn.createStatement();

        boolean isSender = false;
        args = new String[]
        {
            "sender"
        };
        for (String arg : args)
        {
            if (arg.contains("receiver"))
            {
                System.out.println("receiver starting");
                isSender = false;
            }
            else if (arg.contains("sender"))
            {
                System.out.println("sender starting");
                isSender = true;
            }
        }

        if (isSender)
        {
            stat.execute("create alias wait_for_change for \""
                    + TestSimpleDB2.class.getName()
                    + ".waitForChange\"");
            stat.execute("create table test(id identity)");
            stat.execute("create trigger notifier "
                    + "before insert, update, delete, rollback "
                    + "on test call \""
                    + TestSimpleDB2.Notifier.class.getName() + "\"");

            Thread.sleep(1000);
            for (int i = 0; i < 10; i++)
            {
                System.out.println("Sender: I change something...");
                stat.execute("insert into test values(null)");
                Thread.sleep(2000);
            }
        }
        else
        {
            new Thread()
            {
                public void run()
                {
                    try
                    {
                        Connection conn = DriverManager.getConnection(url);
                        for (int i = 0; i < 10; i++)
                        {
                            conn.createStatement().execute(
                                    "call wait_for_change(100000)");
                            System.out.println("Receiver: event received");
                        }
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                }
            }.start();
        }
        conn.close();
    }

    static AtomicLong modCount = new AtomicLong();

    public static void waitForChange(long maxWaitMillis)
    {
        synchronized (modCount)
        {
            try
            {
                modCount.wait(maxWaitMillis);
            }
            catch (InterruptedException e)
            {
                // ignore
            }
        }
    }

    public static class Notifier extends TriggerAdapter
    {

        public void fire(Connection conn, ResultSet oldRow, ResultSet newRow)
                throws SQLException
        {
            modCount.incrementAndGet();
            synchronized (modCount)
            {
                modCount.notifyAll();
            }
        }
    }
}
4

1 回答 1

2

H2 没有实现 JMS(事实上我不知道有哪个数据库可以实现)。但是,您可以使用触发器和用户定义的函数在 H2 中构建简单的通知机制,如下所示。请注意,这需要 H2 中的多线程模式,该模式尚未完全测试。因此,使用单独的数据库进行消息传递而不是用于数据的数据库可能更有意义。

import java.sql.*;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.tools.TriggerAdapter;

public class TestSimpleDb {

    public static void main(String[] args) throws Exception {
        final String url = "jdbc:h2:mem:test;multi_threaded=true";
        Connection conn = DriverManager.getConnection(url);
        Statement stat = conn.createStatement();
        stat.execute("create alias wait_for_change for \"" + 
                TestSimpleDb.class.getName() + 
                ".waitForChange\"");
        stat.execute("create table test(id identity)");
        stat.execute("create trigger notifier " + 
                "before insert, update, delete, rollback " +
                "on test call \"" + 
                TestSimpleDb.Notifier.class.getName() + "\"");
        new Thread() {
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(url);
                    for (int i = 0; i < 10; i++) {
                        conn.createStatement().execute(
                                "call wait_for_change(10000)");
                        System.out.println("Receiver: event received");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
        Thread.sleep(500);
        for (int i = 0; i < 10; i++) {
            System.out.println("Sender: I change something...");
            stat.execute("insert into test values(null)");
            Thread.sleep(1000);
        }
        conn.close();
    }

    static AtomicLong modCount = new AtomicLong();

    public static void waitForChange(long maxWaitMillis) {
        synchronized (modCount) {
            try {
                modCount.wait(maxWaitMillis);
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

    public static class Notifier extends TriggerAdapter {
        public void fire(Connection conn, ResultSet oldRow, ResultSet newRow)
                throws SQLException {
            modCount.incrementAndGet();
            synchronized (modCount) {
                modCount.notifyAll();
            }
        }
    }

}
于 2013-10-10T15:56:25.030 回答