我正在学习事务如何在 gridgain 中工作并面临下一个问题:即使使用 SERIALIZABLE 隔离级别,乐观模式也不能保证一致性。这是显示问题的示例单元测试:
@Before
public void init() throws GridException {
GridCache<Object, Object> cache = grid.cache("cache");
accounts = 1;
threads = 2;
accountVersions = cache.projection(Integer.class, Long.class);
for (int i = 0; i < accounts; i++) {
accountVersions.putx(i, 0L);
}
}
@Test
public void testOptimisticTransaction() throws InterruptedException, GridException {
testWithTransactionSetting(GridCacheTxConcurrency.OPTIMISTIC, GridCacheTxIsolation.SERIALIZABLE);
}
private void testWithTransactionSetting(GridCacheTxConcurrency txConcurrency, GridCacheTxIsolation txIsolation) throws InterruptedException, GridException {
//given
this.txConcurrency = txConcurrency;
this.txIsolation = txIsolation;
final int operation = 20;
final int operationsPerThread = operation / threads;
final SortedSetMultimap<Integer, Long> map = TreeMultimap.create();
final CountDownLatch latch = new CountDownLatch(threads);
class Job implements Runnable {
private SortedSetMultimap<Integer, Long> completedOperations = TreeMultimap.create();
@Override
public void run() {
for (int j = 0; j < operationsPerThread; j++) {
int accountId = 0;
try {
long version = makeOperation(accountId);
assertNotSame("Version can't be -1", -1, version);
completedOperations.put(accountId, version);
} catch (Throwable th) {
th.printStackTrace();
}
}
latch.countDown();
}
}
List<Job> jobs = new ArrayList<>();
//when
for (int i = 0; i < threads; i++) {
Job job = new Job();
jobs.add(job);
new Thread(job).start();
}
latch.await();
for (Job job : jobs) {
System.out.println(job.completedOperations);
for (int accountId : job.completedOperations.keys()) {
Sets.SetView intersection = Sets.intersection(job.completedOperations.get(accountId), map.get(accountId));
assertTrue("makeOperation returns not unique versions\n Some of versions are already used for accountID: " + accountId + ";intersection: " + intersection, intersection.isEmpty());
}
map.putAll(job.completedOperations);
}
//then
long totalInCache = 0;
long totalInClient = 0;
for (int i = 0; i < accounts; i++) {
totalInCache += accountVersions.get(i);
totalInClient += map.get(i).size();
}
System.out.printf("Operations in cache: %d; in client: %d\n", totalInCache, totalInClient);
for (int i = 0; i < accounts; i++) {
long version = accountVersions.get(i);
SortedSet<Long> receivedVersions = map.get(i);
SortedSet<Long> versionsInCache = new TreeSet<>();
for (long j = 1; j <= version; j++) {
versionsInCache.add(j);
}
assertEquals("versions for account: " + i + " are not matched in cache and in client: ", versionsInCache, receivedVersions);
}
}
private long makeOperation(int accountId) {
for (int i = 0; i < MAX_RETRIES; i++) {
try (GridCacheTx tx = accountVersions.txStart(txConcurrency, txIsolation)) {
long currentVersion = accountVersions.get(accountId);
accountVersions.put(accountId, 1 + currentVersion);
tx.commit();
return currentVersion + 1;
} catch (GridException e) {
if (i == MAX_RETRIES - 1) {
throw new RuntimeException(e);
}
}
}
return -1;
}
配置.xml:
<bean id="grid.cfg" scope="singleton" class="org.gridgain.grid.GridConfiguration">
<property name="localHost" value="127.0.0.1"/>
<property name="cacheConfiguration">
<list>
<!-- Partitioned cache example configuration (Atomic mode). -->
<bean parent="cache-template">
<property name="txSerializableEnabled" value="true"/>
<property name="name" value="cache"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="distributionMode" value="PARTITIONED_ONLY"/>
<property name="backups" value="1"/>
<property name="invalidate" value="true"/>
</bean>
</list>
</property>
<property name="discoverySpi">
<bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.multicast.GridTcpDiscoveryMulticastIpFinder"/>
</property>
</bean>
</property>
</bean>
<!--Template for all example cache configurations.-->
<bean id="cache-template" abstract="true" class="org.gridgain.grid.cache.GridCacheConfiguration">
<!-- Initial cache size. -->
<property name="startSize" value="3000000"/>
<!-- Set synchronous preloading (default is asynchronous). -->
<property name="preloadMode" value="SYNC"/>
<!-- Set to FULL_SYNC for examples, default is PRIMARY_SYNC. -->
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<!-- Set to true to enable indexing for query examples, default value is false. -->
<property name="queryIndexEnabled" value="true"/>
</bean>
提前致谢