0

我正在学习事务如何在 gridgain 中工作并面临下一个问题:即使使用 SERIALIZABLE 隔离级别,乐观模式也不能保证一致性。这是显示问题的示例单元测试:

@Before
public void init() throws GridException {
    GridCache<Object, Object> cache = grid.cache("cache");
    accounts = 1;
    threads = 2;
    accountVersions = cache.projection(Integer.class, Long.class);
    for (int i = 0; i < accounts; i++) {
        accountVersions.putx(i, 0L);
    }
}

@Test
public void testOptimisticTransaction() throws InterruptedException, GridException {
    testWithTransactionSetting(GridCacheTxConcurrency.OPTIMISTIC, GridCacheTxIsolation.SERIALIZABLE);
}

private void testWithTransactionSetting(GridCacheTxConcurrency txConcurrency, GridCacheTxIsolation txIsolation) throws InterruptedException, GridException {
    //given
    this.txConcurrency = txConcurrency;
    this.txIsolation = txIsolation;

    final int operation = 20;
    final int operationsPerThread = operation / threads;

    final SortedSetMultimap<Integer, Long> map = TreeMultimap.create();
    final CountDownLatch latch = new CountDownLatch(threads);
    class Job implements Runnable {
        private SortedSetMultimap<Integer, Long> completedOperations = TreeMultimap.create();

        @Override
        public void run() {
            for (int j = 0; j < operationsPerThread; j++) {
                int accountId = 0;
                try {
                    long version = makeOperation(accountId);
                    assertNotSame("Version can't be -1", -1, version);
                    completedOperations.put(accountId, version);
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
            latch.countDown();
        }
    }
    List<Job> jobs = new ArrayList<>();

    //when
    for (int i = 0; i < threads; i++) {
        Job job = new Job();
        jobs.add(job);
        new Thread(job).start();
    }

    latch.await();
    for (Job job : jobs) {
        System.out.println(job.completedOperations);
        for (int accountId : job.completedOperations.keys()) {
            Sets.SetView intersection = Sets.intersection(job.completedOperations.get(accountId), map.get(accountId));
            assertTrue("makeOperation returns not unique versions\n Some of versions are already used for accountID: " + accountId + ";intersection: " + intersection, intersection.isEmpty());
        }
        map.putAll(job.completedOperations);
    }

    //then
    long totalInCache = 0;
    long totalInClient = 0;
    for (int i = 0; i < accounts; i++) {
        totalInCache += accountVersions.get(i);
        totalInClient += map.get(i).size();
    }
    System.out.printf("Operations in cache: %d; in client: %d\n", totalInCache, totalInClient);
    for (int i = 0; i < accounts; i++) {
        long version = accountVersions.get(i);
        SortedSet<Long> receivedVersions = map.get(i);
        SortedSet<Long> versionsInCache = new TreeSet<>();
        for (long j = 1; j <= version; j++) {
            versionsInCache.add(j);
        }
        assertEquals("versions for account: " + i + " are not matched in cache and in client: ", versionsInCache, receivedVersions);
    }
}

private long makeOperation(int accountId) {
    for (int i = 0; i < MAX_RETRIES; i++) {
        try (GridCacheTx tx = accountVersions.txStart(txConcurrency, txIsolation)) {
            long currentVersion = accountVersions.get(accountId);
            accountVersions.put(accountId, 1 + currentVersion);
            tx.commit();
            return currentVersion + 1;
        } catch (GridException e) {
            if (i == MAX_RETRIES - 1) {
                throw new RuntimeException(e);
            }
        }
    }
    return -1;
}

配置.xml:

<bean id="grid.cfg" scope="singleton" class="org.gridgain.grid.GridConfiguration">

    <property name="localHost" value="127.0.0.1"/>

    <property name="cacheConfiguration">
        <list>
            <!-- Partitioned cache example configuration (Atomic mode). -->
            <bean parent="cache-template">
                <property name="txSerializableEnabled" value="true"/>
                <property name="name" value="cache"/>
                <property name="cacheMode" value="PARTITIONED"/>
                <property name="atomicityMode" value="TRANSACTIONAL"/>
                <property name="distributionMode" value="PARTITIONED_ONLY"/>
                <property name="backups" value="1"/>
                <property name="invalidate" value="true"/>
            </bean>

        </list>
    </property>

    <property name="discoverySpi">
        <bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi">
            <property name="ipFinder">
                <bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.GridTcpDiscoveryMulticastIpFinder"/>
            </property>
        </bean>
    </property>

</bean>

 <!--Template for all example cache configurations.-->
<bean id="cache-template" abstract="true" class="org.gridgain.grid.cache.GridCacheConfiguration">
    <!-- Initial cache size. -->
    <property name="startSize" value="3000000"/>

    <!-- Set synchronous preloading (default is asynchronous). -->
    <property name="preloadMode" value="SYNC"/>

    <!-- Set to FULL_SYNC for examples, default is PRIMARY_SYNC. -->
    <property name="writeSynchronizationMode" value="FULL_SYNC"/>

    <!-- Set to true to enable indexing for query examples, default value is false. -->
    <property name="queryIndexEnabled" value="true"/>
</bean>

提前致谢

4

1 回答 1

0

在我看来 PESSIMISTIC REPEATABLE_READ 事务更适合您的用例,并将提供类似或更好的性能。

但是,我已将您的问题告知 GridGain 团队,他们将尝试重现该问题并在必要时进行修复。

于 2014-04-10T23:13:54.600 回答