2

我正在运行一个 Storm 拓扑,它从 Kafka 代理读取数据并写入 Cassandra。我的 Cassandra 螺栓之一执行读写操作。我的键空间总是动态设置的。现在我想使用连接池连接到 Cassandra?

我的流中有键空间名称。我需要将数据动态插入适当的键空间/

1)我尝试使用方法内部的连接池方法获得 Cassandra 连接,execute以便每个元组都获得一个 Cassandra 连接。所以在某个时间点后,我的连接达到了我的线程 1024 池连接限制。之后出现连接超时错误。

例子:

ExecutorService pool = Executors.newFixedThreadPool(1024); 
public void execute(Tuple input)    {
  if(input.size()>0)        {          
     ConnectionManager cm=new ConnectionManager();     
     cm.keyspace = "dda400db5ef2d";
     statement = cm.poolRun();  
     cql="select * form columnfamily where id='d78346';
   }
}

2)我尝试prepare在拓扑初始化worker并获得静态连接时使用方法获得连接

public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {          
  _OutputCollector=collector;
  ConnectionManager cm=new ConnectionManager();
  cm.keyspace ="dda400db5ef2d";    statement = cm.poolRun();    
} 

public void execute(Tuple input)  { 
  if(input.size()>0) {          
    cql="select * form columnfamily where id='d78346';
  }
}

如果数据属于一个键空间,则第二种情况有效。但是我的案例数据属于不同的键空间,这里只有一个拓扑会识别键空间并写入该键空间。

风暴中是否有任何散列方法可用于保存键空间连接?

或者

还有其他逻辑吗?

4

2 回答 2

0

我对 Cassandra 不熟悉,但我认为这就是您想要的:

private ConnectionManager cm = null;
public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {          
  _OutputCollector=collector;
  cm=new ConnectionManager();    
}

public void execute(Tuple input)  { 
  cm.keyspace = "dda400db5ef2d";
  statement = cm.poolRun();  
  cql="select * form columnfamily where id='d78346';
}

prepare 方法在拓扑开始时运行一次。您可以使用它来初始化变量/连接并在您的执行方法中运行它们。查询完成后,您可能希望关闭连接并在下一个查询时重置连接。希望这可以帮助

于 2014-07-12T09:24:26.133 回答
0

做以下事情 -

1)不仅仅是 Cassandra 集群,在你的 Bolt 的准备方法中创建一个会话。创建会话时不要使用键空间。

public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {   
    CassandraConnector client = new CassandraConnector();
    client.connect("127.0.0.1", 9142);
    this.session = client.getSession();
}

2) 现在根据您的数据创建查询。在这里,您的键空间名称应该以您的表名称为前缀。

public void execute(Tuple input)  { 
  if(input.size()>0) {          
     cql="select * form keyspace.table where id='d78346'";
  }
}
于 2019-08-30T23:22:39.893 回答