我可以使用以下代码片段连接到 AWS Managed Cassandra Service。
CassandraSink.addSink(cassandraEntityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withCredentials(
"username",
"password")
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc("ap-northeast-1")
.build())
//.withQueryOptions(option)
.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
在将 Stream POJO 写入 Cassandra 时,我遇到了以下异常。
com.datastax.driver.core.exceptions.InvalidQueryException:此操作不支持一致性级别 LOCAL_ONE。支持的一致性级别是:LOCAL_QUORUM
通过使用以下代码段设置 ConsistencyLevel = LOCAL_QUORUM,我能够在另一个项目中解决此问题(不使用 flink)。
QueryOptions option = new QueryOptions();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
final Cluster cluster =
Cluster.builder()
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withQueryOptions(option) // NOTE
.withAuthProvider(
new PlainTextAuthProvider(
"username",
"password"))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder().withLocalDc("ap-northeast-1").build())
.build();
final Session session = cluster.connect("test");
当我在 flink 中尝试相同时,我收到以下错误:
线程“主”org.apache.flink.api.common.InvalidProgramException 中的异常:com.datastax.driver.core.QueryOptions@130161f7 不可序列化。该对象可能包含或引用不可序列化的字段。
有什么,我错过了吗?请详细说明如何使用 Flink Cassandra 连接器连接/写入 MCS。
PS:
- 我使用下面的命令来创建键空间。
CREATE KEYSPACE "test"
WITH
REPLICATION = {'class': 'SingleRegionStrategy'}
我没有在我的代码中使用 AmazonRootCA1.pem。
我没有在我的代码或环境中使用 cassandra_truststore.jks。
我已经安装了证书
temp_file.der
证书,它是按照这些步骤创建的。我使用的是 Flink 1.8.2,因为这是 Kinesis Data Analytics 中可用的环境版本。
2020 年 7 月 4 日更新
我可以通过为 QueryOptions 创建一个 Serializable 包装器来解决序列化问题。请在下面找到代码片段:
import com.datastax.driver.core.QueryOptions;
import java.io.Serializable;
public class QueryOptionsSerializable extends QueryOptions implements Serializable {
private static final long serialVersionUID = 2793938419775311824L;
}
使用此解决方案,我能够在代码中将一致性级别设置为 LOCAL_QUORUM 并在没有任何异常的情况下运行。
// Setting consistency level
QueryOptionsSerializable option = new QueryOptionsSerializable();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
CassandraSink.addSink(entityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
Cluster.Builder tempBuilder = builder.addContactPoint(host).withPort(port);
if (isSSLEnabled) {
// enable SSL config if isSSLEnabled flag is ON.
tempBuilder.withSSL();
}
if (username != null && password != null) {
// if username & password is provided, use it for connection.
tempBuilder.withCredentials(username, password);
}
tempBuilder.withQueryOptions(option);
return tempBuilder.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.setDefaultKeyspace(keyspace)
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
但是在写信给 MCS 时,我遇到了同样的错误:
com.datastax.driver.core.exceptions.InvalidQueryException:此操作不支持一致性级别 LOCAL_ONE。支持的一致性级别是:LOCAL_QUORUM
任何帮助将不胜感激!