我正在运行一个 Storm 拓扑,它从 Kafka 代理读取数据并写入 Cassandra。我的 Cassandra 螺栓之一执行读写操作。我的键空间总是动态设置的。现在我想使用连接池连接到 Cassandra?
我的流中有键空间名称。我需要将数据动态插入适当的键空间/
1)我尝试使用方法内部的连接池方法获得 Cassandra 连接,execute
以便每个元组都获得一个 Cassandra 连接。所以在某个时间点后,我的连接达到了我的线程 1024 池连接限制。之后出现连接超时错误。
例子:
ExecutorService pool = Executors.newFixedThreadPool(1024);
public void execute(Tuple input) {
if(input.size()>0) {
ConnectionManager cm=new ConnectionManager();
cm.keyspace = "dda400db5ef2d";
statement = cm.poolRun();
cql="select * form columnfamily where id='d78346';
}
}
2)我尝试prepare
在拓扑初始化worker并获得静态连接时使用方法获得连接
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
_OutputCollector=collector;
ConnectionManager cm=new ConnectionManager();
cm.keyspace ="dda400db5ef2d"; statement = cm.poolRun();
}
public void execute(Tuple input) {
if(input.size()>0) {
cql="select * form columnfamily where id='d78346';
}
}
如果数据属于一个键空间,则第二种情况有效。但是我的案例数据属于不同的键空间,这里只有一个拓扑会识别键空间并写入该键空间。
风暴中是否有任何散列方法可用于保存键空间连接?
或者
还有其他逻辑吗?