我们在 Spring 上下文中运行应用程序,使用 DataNucleus 作为我们的 ORM 映射和 mysql 作为我们的数据库。
我们的应用程序每天都会将一些数据输入到我们的数据库中。数据馈送的大小转化为大约 100 万行插入/更新。导入的性能开始时非常好,但随后会降低超时(随着执行的查询数量增加),并且在某些时候应用程序冻结或停止响应。在应用程序再次响应之前,我们将不得不等待整个工作完成。
这种行为对我们来说看起来很像内存泄漏,我们一直在努力查看我们的代码以发现任何潜在的问题,但是问题并没有消失。我们从堆转储中发现的一件有趣的事情是 org.datanucleus.ExecutionContextThreadedImpl(或 HashSet/HashMap)在导入期间占据了我们 90% 的内存(5GB)。(我附上了下面转储的屏幕截图)。我在互联网上的研究表明这个参考是 Level1 缓存(不确定我是否正确)。我的问题是在大型导入期间,我如何限制/控制一级缓存的大小。可能会要求 DN 在我的导入过程中不缓存?
如果那不是 L1 缓存,我的内存问题的可能原因是什么?
我们的代码为每个插入使用一个事务,以防止锁定数据库中的大量数据。每 2000 次插入调用一次 flush 方法
作为一项临时修复,我们将导入过程移动到在没有人使用我们的应用程序时在一夜之间运行。显然,这不可能永远持续下去。请有人至少指出我们正确的方向,以便我们可以做更多的研究,并希望我们能找到解决办法。
如果有人知道解码堆转储会很好
我们所有人都会非常感谢您的帮助。非常感谢!
https://s3-ap-southeast-1.amazonaws.com/public-external/datanucleus_heap_dump.png
https://s3-ap-southeast-1.amazonaws.com/public-external/datanucleus_dump2.png
下面的代码 - 此方法的调用者没有事务。此方法每次调用将处理一个导入对象,我们每天需要处理大约 100K 的这些对象
@Override
@PreAuthorize("(hasUserRole('ROLE_ADMIN')")
@Transactional(propagation = Propagation.REQUIRED)
public void processImport(ImportInvestorAccountUpdate account, String advisorCompanyKey) {
ImportInvestorAccountDescriptor invAccDesc = account
.getInvestorAccount();
InvestorAccount invAcc = getInvestorAccountByImportDescriptor(
invAccDesc, advisorCompanyKey);
try {
ParseReportingData parseReportingData = ctx
.getBean(ParseReportingData.class);
String baseCCY = invAcc.getBaseCurrency();
Date valueDate = account.getValueDate();
ArrayList<InvestorAccountInformationILAS> infoList = parseReportingData
.getInvestorAccountInformationILAS(null, invAcc, valueDate,
baseCCY);
InvestorAccountInformationILAS info = infoList.get(0);
PositionSnapshot snapshot = new PositionSnapshot();
ArrayList<Position> posList = new ArrayList<Position>();
Double totalValueInBase = 0.0;
double totalQty = 0.0;
for (ImportPosition importPos : account.getPositions()) {
Asset asset = getAssetByImportDescriptor(importPos
.getTicker());
PositionInsurance pos = new PositionInsurance();
pos.setAsset(asset);
pos.setQuantity(importPos.getUnits());
pos.setQuantityType(Position.QUANTITY_TYPE_UNITS);
posList.add(pos);
}
snapshot.setPositions(posList);
info.setHoldings(snapshot);
log.info("persisting a new investorAccountInformation(source:"
+ invAcc.getReportSource() + ") on " + valueDate
+ " of InvestorAccount(key:" + invAcc.getKey() + ")");
persistenceService.updateManagementEntity(invAcc);
} catch (Exception e) {
throw new DataImportException(invAcc == null ? null : invAcc.getKey(), advisorCompanyKey,
e.getMessage());
}
}