0

我必须在 XG 交易中生成一个唯一的发票编号,该交易在我的数据模型中包含以下3 个实体组:

  • (toplevel) ContactRoot <-- (ancestor) <--- Contact : 在交易过程中联系人必须更新为状态 Client

  • (toplevel) CustomerSettings : 保存下一个要使用的序列号;有一个且只有一个具有固定静态 ID 的 CustomerSettings 实例;交易过程中序号必须加1

  • (toplevel) InvoiceRoot <-- (ancestor) <--- Invoice : 根据 CustomerSettings 中的序列号分配新的唯一发票号;

这是 DAO 实现的重要部分(不相关的业务规则检查等已删除):

public void saveInvoice(final Invoice invoice) throws BusinessRuleException {

    final Objectify ofy = ObjectifyService.factory().begin().cache(true);
    ofy.transact(new Work<Void>() {

        @Override
        public Void run() {
            CustomerSettings customerSettings = ofy.load()
                    .key(Key.create(CustomerSettings.class, CustomerSettings.ID)).safeGet();
            Contact contact = ofy.load().key(createContactKey(invoice.getContactId()).safeGet();
            contact.setContactType(ContactType.CLIENT);
            ofy.save().entity(contact).now();
            String invoiceNumber = generateSequence(ofy, customerSettings);
            invoice.setInvoiceNumber(invoiceNumber);
            ofy.save().entity(invoice).now();
            return null;
        }
    });
}

以及生成下一个序列号的简化版本,其中下一个序列号为下一次调用而增加,并且必须以事务方式更新 CustomerSettings(我有这个同步但我想这不是真的有用):

private synchronized String generateSequence(Objectify ofy, CustomerSettings settings) {
    String ret = "";
    int sequence = settings.getNextSequence();
    settings.setNextSequence(sequence + 1);
    ofy.save().entity(settings).now();
    ret = "" + sequence;
    return ret;
}

这是我的单元测试对于可变线程数的样子:

private void test(final int threadCount) throws InterruptedException, ExecutionException {
    final Environment currentEnvironment = ApiProxy.getCurrentEnvironment();
    Callable<String> task = new Callable<String>() {
        @Override
        public String call() {
            ApiProxy.setEnvironmentForCurrentThread(currentEnvironment);
            return generateInvoiceNumber();
        }
    };
    List<Callable<String>> tasks = Collections.nCopies(threadCount, task);
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    List<Future<String>> futures = executorService.invokeAll(tasks);
    List<String> resultList = new ArrayList<String>(futures.size());
    // Check for exceptions
    for (Future<String> future : futures) {
        // Throws an exception if an exception was thrown by the task.
        resultList.add(future.get());
    }
    // Validate the IDs
    Assert.assertEquals(futures.size(), threadCount);
    List<String> expectedList = new ArrayList<String>(threadCount);
    for (long i = 1; i <= threadCount; i++) {
        expectedList.add("" + i);
    }
    Collections.sort(resultList);
    Assert.assertEquals(expectedList, resultList);
}

@SuppressWarnings("unchecked")
private String generateInvoiceNumber() {
    InvoiceDAO invoiceDAO = new InvoiceDAO();
    Invoice invoice = ... create a valid invoice
    invoiceDAO.saveInvoice(invoice);
    log.info("generated invoice number : " + invoice.getInvoiceNumber());
    return invoice.getInvoiceNumber();
}

例如,当我同时运行 32 个线程时:

@Test
public void test32() throws InterruptedException, ExecutionException {
    test(32);
}

但是后续线程看不到先前的交易增加了发票编号序列。

这是结果:

junit.framework.AssertionFailedError: 预期:<[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]> 但为:<[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3]>

我已经浏览了几次文档,无法弄清楚为什么这不起作用?

如果您在一个事务中访问多个实体组,则该事务为 XG 事务。如果您只访问一个,则不是。5 个 EG 的标准限制适用于所有交易。 客观化交易文件

我究竟做错了什么 ?

4

1 回答 1

1

这段代码使代码不是事务性的:

final Objectify ofy = ObjectifyService.factory().begin().cache(true);

ofy.transact(new Work<Void>() {

       ....
       ofy.save().entity(settings).now();   
       .... 

}

因为我重用了非事务性的 objectify 实例。要在事务 Work 中获取实例,您必须始终像这样询问实例:

ObjectifyService.ofy()

更多信息在这里的小组讨论中。

查看 的实现ObjectifyService,您可以看到新实例被推入/弹出堆栈/从堆栈中弹出;

除此之外,测试用例仍然没有运行.. 最好的测试方法大概是同时触发 http 请求;

于 2012-10-31T18:38:07.040 回答