我正在使用批处理语句将来自 csv 文件的数据插入 Cassandra。我的表看起来像这样创建表曝光(expoid bigint,fileid bigint,studyid text, projname text, w text, x text, y text, z text)
System.out.println(colDataMap);
String keyspace = "orchtablespaces";
String tabName = fileName;
//String tableFile = "/home/blr-lt-202/empAccount.txt";
String tableFile = fname;
Set<String> colNamesSet = colDataMap.keySet();
String[] colNames = colNamesSet.toArray(new String[colNamesSet.size()]);
System.out.println("ColNames ::" +colNames);
String makeStatement = makeSt(keyspace,tabName,colNames);
System.out.println("makeStatement ::"+makeStatement);
if(count==65534)
{
session.executeAsync(bs);
count = 0;
bs = new BatchStatement();
}
PreparedStatement statement = session.prepare(makeStatement);
//bcz expo id and seq_n
String expoid =(String) colDataMap.get("expoid");
String Seq_No = (String) colDataMap.get("Seq_No");
colDataMap.put(expoid, Long.valueOf(expoid));
colDataMap.put("Seq_No", Long.valueOf(Seq_No));
BoundStatement query = statement.bind(colDataMap.values().toArray(new Object[colDataMap.size()]));
//BoundStatement query = statement.bind(colDataMap.get("Seq_No"),colDataMap.get("fileId"),colDataMap.get("studyId"),colDataMap.get("projectName"),colDataMap.get("dosetxt"),colDataMap.get("sdurtunit"),colDataMap.get("durtunit"));
System.out.println("query "+query);
bs.add(query);
count++;
}
session.executeAsync(bs);
session.close();
将Map
ColDataMap
所有 Cassandra Table col 名称作为键及其值在地图中如下所示
{Seq_No=0, fileId=123, studyId=786, w=PreCLinic, x=Tasq 30mg/kg, y=12 days, z=12 days}
Set<String> colNamesSet =colDataMap.keySet();
String[] colNames = colNamesSet.toArray(new String[colNamesSet.size()]);
System.out.println("ColNames ::" +colNames);
String makeStatement = makeSt(keyspace,tabName,colNames);
System.out.println("makeStatement ::"+makeStatement);
makeSt 方法准备prepared Statement
makeStatement ::INSERT INTO orchtablespaces.EXPOSURES ( Seq_No,fileId,studyId,w,x,y,z ) values ( ?,?,?,?,?,?,? )
这一切都很好。然后我执行以下操作
PreparedStatement statement = session.prepare(makeStatement);
//bcz expoid and seq_no are of type bigint int the cassandra table
String expoid =(String) colDataMap.get("expoid");
String Seq_No = (String) colDataMap.get("Seq_No");
colDataMap.put(expoid, Long.valueOf(expoid));
colDataMap.put("Seq_No", Long.valueOf(Seq_No));
BoundStatement query = statement.bind(colDataMap.values().toArray(new Object[colDataMap.size()]));
当我运行程序时,我收到以下错误
com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)com.test.load.microarr.CopyOfLoadMicroArr.transformSourceFile(CopyOfLoadMicroArr.java:486)com.test.load.microarr.CopyOfLoadMicroArr.loadData(CopyOfLoadMicroArr.java:149)
com.test.load.microarr.CopyOfLoadMicroArr.main(CopyOfLoadMicroArr.java:114)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)
com.datastax.driver.core.SessionManager.prepareAsync(SessionManager.java:124)
com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:77)
... 3 more
com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:79)
程序中断的确切行是第 486 行,即
PreparedStatement statement = session.prepare(makeStatement);
我正在使用 DataStax Cassandra 2.1.8 .. 任何建议和解决方案都会非常有帮助!谢谢 !