我必须在 XG 交易中生成一个唯一的发票编号,该交易在我的数据模型中包含以下3 个实体组:
(toplevel) ContactRoot <-- (ancestor) <--- Contact : 在交易过程中联系人必须更新为状态 Client
(toplevel) CustomerSettings : 保存下一个要使用的序列号;有一个且只有一个具有固定静态 ID 的 CustomerSettings 实例;交易过程中序号必须加1
(toplevel) InvoiceRoot <-- (ancestor) <--- Invoice : 根据 CustomerSettings 中的序列号分配新的唯一发票号;
这是 DAO 实现的重要部分(不相关的业务规则检查等已删除):
public void saveInvoice(final Invoice invoice) throws BusinessRuleException {
final Objectify ofy = ObjectifyService.factory().begin().cache(true);
ofy.transact(new Work<Void>() {
@Override
public Void run() {
CustomerSettings customerSettings = ofy.load()
.key(Key.create(CustomerSettings.class, CustomerSettings.ID)).safeGet();
Contact contact = ofy.load().key(createContactKey(invoice.getContactId()).safeGet();
contact.setContactType(ContactType.CLIENT);
ofy.save().entity(contact).now();
String invoiceNumber = generateSequence(ofy, customerSettings);
invoice.setInvoiceNumber(invoiceNumber);
ofy.save().entity(invoice).now();
return null;
}
});
}
以及生成下一个序列号的简化版本,其中下一个序列号为下一次调用而增加,并且必须以事务方式更新 CustomerSettings(我有这个同步但我想这不是真的有用):
private synchronized String generateSequence(Objectify ofy, CustomerSettings settings) {
String ret = "";
int sequence = settings.getNextSequence();
settings.setNextSequence(sequence + 1);
ofy.save().entity(settings).now();
ret = "" + sequence;
return ret;
}
这是我的单元测试对于可变线程数的样子:
private void test(final int threadCount) throws InterruptedException, ExecutionException {
final Environment currentEnvironment = ApiProxy.getCurrentEnvironment();
Callable<String> task = new Callable<String>() {
@Override
public String call() {
ApiProxy.setEnvironmentForCurrentThread(currentEnvironment);
return generateInvoiceNumber();
}
};
List<Callable<String>> tasks = Collections.nCopies(threadCount, task);
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<Future<String>> futures = executorService.invokeAll(tasks);
List<String> resultList = new ArrayList<String>(futures.size());
// Check for exceptions
for (Future<String> future : futures) {
// Throws an exception if an exception was thrown by the task.
resultList.add(future.get());
}
// Validate the IDs
Assert.assertEquals(futures.size(), threadCount);
List<String> expectedList = new ArrayList<String>(threadCount);
for (long i = 1; i <= threadCount; i++) {
expectedList.add("" + i);
}
Collections.sort(resultList);
Assert.assertEquals(expectedList, resultList);
}
@SuppressWarnings("unchecked")
private String generateInvoiceNumber() {
InvoiceDAO invoiceDAO = new InvoiceDAO();
Invoice invoice = ... create a valid invoice
invoiceDAO.saveInvoice(invoice);
log.info("generated invoice number : " + invoice.getInvoiceNumber());
return invoice.getInvoiceNumber();
}
例如,当我同时运行 32 个线程时:
@Test
public void test32() throws InterruptedException, ExecutionException {
test(32);
}
但是后续线程看不到先前的交易增加了发票编号序列。
这是结果:
junit.framework.AssertionFailedError: 预期:<[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]> 但为:<[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3]>
我已经浏览了几次文档,无法弄清楚为什么这不起作用?
如果您在一个事务中访问多个实体组,则该事务为 XG 事务。如果您只访问一个,则不是。5 个 EG 的标准限制适用于所有交易。 客观化交易文件
我究竟做错了什么 ?