0

我的应用程序处理大量实时数据(每天超过 2 亿),我需要实时汇总它们以保持报告性能。数据由服务器的多个线程提供并因此随机处理。

MariaDB 10.5.6-MariaDBInnoDB 10.5.6

  • 您知道触发器是否是线程安全的,即是否可能发生数据竞争。换句话说,当 1000 次更新 - 仅增量 - 在一秒钟内通过 10 个连接发生在单行中的相同列上时,数据不会被弄乱,结果将就像值是由单个串联连接相加的.

  • 您是否知道行级锁定的工作原理以及它是自动的还是可以手动强制执行的。

分享您的一些相关书签也将不胜感激,因为我在 google 中没有找到任何简洁有用的东西。

更新

我添加了一个插入后触发器,如果​​记录不存在,它会在报告表中创建一条新记录,然后使用更新语句更新列update table set field=value+delta where condition。数据库不喜欢它,并且正在发送数据的应用程序 - java,hibernate - 也无法承受它并开始抛出异常:

  • 这与休眠尝试插入的行完全无关,因为它没有尝试更新。显然它来自 MariaDB 触发器:Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)
  • 我不确定为什么会发生这种情况,但也得到了一些:Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
4

1 回答 1

0

我发现触发器不是线程安全的,因为数据库开始抛出不同的错误以对同一行进行并发更新:

  • 行被另一个事务更新或删除(或未保存值映射不正确)
  • 尝试获取锁时发现死锁;尝试重启事务

我试图引入行级锁定,但它根本不起作用。我相信锁被忽略或行根本没有被锁定

$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service 
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
  • 自动提交被禁用
  • 事务隔离更改为已提交读
  • SELECT what FROM tables WHERE conditions FOR UPDATE尝试使用主键进行行级锁定

尝试了一个表级锁定等效解决方案,以及使用单个线程持久化数据,但它无法应对我拥有的数据量。

我采用的解决方案是提要处理与持久性的线程级分离,即多个线程处理传入的数据提要并为另一组线程创建实体对象以将它们保存在数据库中。这使我能够进行试验并为我的平台找到每个区域的最佳线程数,就像目前一样,我正在测试 8 个线程处理传入的提要并为另外 4 个负责将它们保存在数据库中的线程创建实体对象。对于持久化线程,我在应用层引入了一些智能隔离和自定义锁定实体集,以确保没有两个线程同时尝试更新同一行。这似乎可行,我现在只需要为这两个区域找到正确数量的线程。

这是为数据库编写者生成积压的消费者类

    protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);

    private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
            for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
                persister.saveAll(dao, entry.getKey(), entry.getValue());
            }
    }

这是数据库编写者的积压工作

    private LinkedBlockingQueue<Key> keys;
    private Map<Key, Set> backlog;

    public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
        Key<ENTITY> key = Key.get(dao, bucket);
        synchronized (key) {
            synchronized (backlog) {
                if (backlog.containsKey(key)) {
                    backlog.get(key).addAll(entitySet);
                } else {
                    backlog.put(key, entitySet);
                    try {
                        keys.put(key);
                    } catch (InterruptedException ex) {
                    }
                }
            }
        }
    }

这是 DB writer 的核心

    private void processDBBatchUpdate(Key key) {
        synchronized (key) {
            Set set;
            synchronized (backlog) {
                set = backlog.remove(key);
            }

            key.getDao().saveAll(set);
        }
    }

这是用于锁定的 Key 类

    private IDefaultEntityDAO<ENTITY> dao;
    private String bucket;
    private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();

    private Key(IDefaultEntityDAO dao, String bucket) {
        this.dao = dao;
        this.bucket = bucket;
    }

    public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
        if (!keys.containsKey(dao)) {
            keys.put(dao, new HashMap<>());
        }

        if (!keys.get(dao).containsKey(bucket)) {
            keys.get(dao).put(bucket, new Key(dao, bucket));
        }

        return keys.get(dao).get(bucket);
    }
于 2020-11-24T12:21:19.893 回答