我发现触发器不是线程安全的,因为数据库开始抛出不同的错误以对同一行进行并发更新:
- 行被另一个事务更新或删除(或未保存值映射不正确)
- 尝试获取锁时发现死锁;尝试重启事务
我试图引入行级锁定,但它根本不起作用。我相信锁被忽略或行根本没有被锁定
$ grep "ExecStart=" /usr/lib/systemd/system/mariadb.service
ExecStart=/usr/sbin/mariadbd --autocommit=0 --transaction-isolation=read-committed $MYSQLD_OPTS $_WSREP_NEW_CLUSTER $_WSREP_START_POSITION
- 自动提交被禁用
- 事务隔离更改为已提交读
SELECT what FROM tables WHERE conditions FOR UPDATE
尝试使用主键进行行级锁定
尝试了一个表级锁定等效解决方案,以及使用单个线程持久化数据,但它无法应对我拥有的数据量。
我采用的解决方案是提要处理与持久性的线程级分离,即多个线程处理传入的数据提要并为另一组线程创建实体对象以将它们保存在数据库中。这使我能够进行试验并为我的平台找到每个区域的最佳线程数,就像目前一样,我正在测试 8 个线程处理传入的提要并为另外 4 个负责将它们保存在数据库中的线程创建实体对象。对于持久化线程,我在应用层引入了一些智能隔离和自定义锁定实体集,以确保没有两个线程同时尝试更新同一行。这似乎可行,我现在只需要为这两个区域找到正确数量的线程。
这是为数据库编写者生成积压的消费者类
protected abstract Map<String, Set<ENTITY>> breakDownToBatchesForPersistance(Collection<ENTITY> localBacklog);
private void saveEntitiesInBatches(IDefaultEntityDAO<ENTITY> dao, Collection<ENTITY> localBacklog) {
for (Map.Entry<String, Set<ENTITY>> entry : breakDownToBatchesForPersistance(localBacklog).entrySet()) {
persister.saveAll(dao, entry.getKey(), entry.getValue());
}
}
这是数据库编写者的积压工作
private LinkedBlockingQueue<Key> keys;
private Map<Key, Set> backlog;
public <ENTITY> void saveAll(IDefaultEntityDAO<ENTITY> dao, String bucket, Set<ENTITY> entitySet) {
Key<ENTITY> key = Key.get(dao, bucket);
synchronized (key) {
synchronized (backlog) {
if (backlog.containsKey(key)) {
backlog.get(key).addAll(entitySet);
} else {
backlog.put(key, entitySet);
try {
keys.put(key);
} catch (InterruptedException ex) {
}
}
}
}
}
这是 DB writer 的核心
private void processDBBatchUpdate(Key key) {
synchronized (key) {
Set set;
synchronized (backlog) {
set = backlog.remove(key);
}
key.getDao().saveAll(set);
}
}
这是用于锁定的 Key 类
private IDefaultEntityDAO<ENTITY> dao;
private String bucket;
private static Map<IDefaultEntityDAO, Map<Object, Key>> keys = new HashMap<>();
private Key(IDefaultEntityDAO dao, String bucket) {
this.dao = dao;
this.bucket = bucket;
}
public static synchronized <ENTITY> Key<ENTITY> get(IDefaultEntityDAO<ENTITY> dao, String bucket) {
if (!keys.containsKey(dao)) {
keys.put(dao, new HashMap<>());
}
if (!keys.get(dao).containsKey(bucket)) {
keys.get(dao).put(bucket, new Key(dao, bucket));
}
return keys.get(dao).get(bucket);
}