我正在尝试将 ignite 与 cassandra 集成。我设置了配置并启动了 ignite 节点。但我无法从 Ignite 缓存/cassandra db 中插入/读取数据。我在 cassandra 中创建了 Keyspace 和 table。并插入了一些值。但是当试图读取 values 时,就会出现异常。当我尝试插入一些值时,同样的事情发生了。
我的 Ignite 版本是 2.6 和 cqlsh 5.0.1 | 卡桑德拉 3.11.4 | CQL 规范 3.4.4 | 火花版本是 2.3.0 | scala 版本是 2.11.8 | cassandra 驱动核心 3.0.0 | 番石榴19.0 |
这是点燃配置。
配置3.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- Cassandra connection settings -->
<import resource="file:/etc/ignite/config/cassandra_config/connection-settings3.xml"/>
<!-- Persistence settings for 'cache1' -->
<bean id="cache1_persistence_settings" class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
<constructor-arg type="org.springframework.core.io.Resource" value="file:/etc/ignite/config/cassandra_config/persistence-settings-3.xml" />
</bean>
<!-- Ignite configuration -->
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Enabling ODBC. -->
<property name="odbcConfiguration">
<bean class="org.apache.ignite.configuration.OdbcConfiguration"/>
</property>
<property name="cacheConfiguration">
<list>
<!-- Configuring persistence for "cache1" cache -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="cache1"/>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
<property name="cacheStoreFactory">
<bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
<property name="dataSourceBean" value="cassandraRegularDataSource"/>
<property name="persistenceSettingsBean" value="cache1_persistence_settings"/>
</bean>
</property>
<!-- Query fields configuration -->
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.String"/>
<property name="valueType" value="java.lang.Integer"/>
</bean>
</list>
</property>
<!-- Query fields configuration END -->
</bean>
</list>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
连接-settings3.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="loadBalancingPolicy" class="com.datastax.driver.core.policies.TokenAwarePolicy">
<constructor-arg type="com.datastax.driver.core.policies.LoadBalancingPolicy">
<bean class="com.datastax.driver.core.policies.RoundRobinPolicy"/>
</constructor-arg>
</bean>
<bean id="cassandraRegularDataSource" class="org.apache.ignite.cache.store.cassandra.datasource.DataSource">
<property name="readConsistency" value="QUORUM"/>
<property name="writeConsistency" value="QUORUM"/>
<property name="loadBalancingPolicy" ref="loadBalancingPolicy"/>
<property name="contactPoints">
<list>
<value>127.0.0.1</value>
</list>
</property>
</bean>
</beans>
持久性设置-3.xml
<persistence keyspace="ignite" table="cache_test" ttl="86400">
<keyspaceOptions>
REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3}
AND DURABLE_WRITES = true
</keyspaceOptions>
<tableOptions>
comment = 'Cache test'
AND read_repair_chance = 0.2
</tableOptions>
<!--<keyPersistence class="java.lang.String" strategy="PRIMITIVE" column="key" />
<valuePersistence class="java.lang.Integer" strategy="PRIMITIVE" column="value" />-->
<keyPersistence class="java.lang.Integer" strategy="PRIMITIVE" column="key"/>
<valuePersistence class="java.lang.String" strategy="PRIMITIVE" column="value" />
</persistence>
我得到的错误信息是,
ERROR CassandraCacheStore:586 - Failed to execute Cassandra CQL statement: select "value" from "ignite"."cache_test" where "key"=?;
class org.apache.ignite.IgniteException: Failed to execute Cassandra CQL statement: select "value" from "ignite"."cache_test" where "key"=?;
at org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:167)
at org.apache.ignite.cache.store.cassandra.CassandraCacheStore.load(CassandraCacheStore.java:189)
at org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper.load(CacheStoreBalancingWrapper.java:98)
at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.loadFromStore(GridCacheStoreManagerAdapter.java:327)
at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.load(GridCacheStoreManagerAdapter.java:293)
at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.loadAllFromStore(GridCacheStoreManagerAdapter.java:434)
at org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.loadAll(GridCacheStoreManagerAdapter.java:400)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter$18.call(GridCacheAdapter.java:2046)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter$18.call(GridCacheAdapter.java:2044)
at org.apache.ignite.internal.processors.cache.GridCacheContext$3.call(GridCacheContext.java:1435)
at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:6695)
at org.apache.ignite.internal.processors.closure.GridClosureProcessor$2.body(GridClosureProcessor.java:967)
at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: class org.apache.ignite.IgniteException: Failed to prepare Cassandra CQL statement: select "value" from "ignite"."cache_test" where "key"=?;
at org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.prepareStatement(CassandraSessionImpl.java:621)
at org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:137)
... 15 more
Caused by: class org.apache.ignite.IgniteException: Failed to establish session with Cassandra database
at org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.session(CassandraSessionImpl.java:555)
at org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.prepareStatement(CassandraSessionImpl.java:603)
... 16 more
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/ListenableFuture;
使用以下命令启动 spark-shell Am
bin/spark-shell --conf "sparkor.extraClassPath=/etc/ignite/libs/*:/etc/ignite/libs/optional/ignite-spark/*:/etc/ignite/libs/optional/ignite-log4j/*:/etc/ignite/libs/optional/ignite-yarn/*:/etc/ignite/libs/ignite-spring/*:/etc/ignite/libs/ignite-cassandra-store/*"
--conf "spark.driver.extraClassPath=/etc/ignite/libs/*:/etc/ignite/libs/optional/ignite-spark/*:/etc/ignite/libs/optional/ignite-log4j/*:/etc/ignite/libs/optional/ignite-yarn/*:/etc/ignite/libs/ignite-spring/*:/etc/ignite/libs/ignite-cassandra-store/*"
--packages org.apache.ignite:ignite-spark:2.3.0
--repositories http://repo.maven.apache.org/maven2/org/apache/ignite