我是新来的火花。我正在尝试使用 spark-cassandra 连接器将 csv 文件插入到 cassandra 表中,如下所示:文件位于 Hdfs 中,我正在获取所有文件的路径,并且对于每个路径,我调用一个方法,该方法将 csv 数据转换为对应的 cassandra 数据类型并创建准备好的语句将数据绑定到准备好的语句并将其添加到批处理中。最后我在批处理 1000 时执行。 要点 1. 我使用的是 Apache Cassandra 2.1.8 和 Spark 1.5 2. 我使用 Spark Context 3 读取 Csv 文件。我使用的是 com.datastax.spark.connector.cql。 CassandraConnector 创建与 Cassandra 的会话。
我有 9 个文件,每个文件数据都放在 cassandra 中的一个表中。一切正常所有插入都按预期进行,当我在 spark 提交上提交 jar 时,工作完成。
我面临的问题是当通过 Web 服务调用同一个 Jar 时(Web 服务调用脚本来调用 jar)其中一个文件数据没有被插入并且 spark 上下文没有停止,因此工作永远在运行。
当我插入 4 个文件或 5 个文件时,即使通过 Web 服务也一切正常。但总的来说它挂了,我在其中一张表中少了 10 条记录,而且上下文并没有停止。
这很奇怪,因为当我直接在 spark 提交上提交 jar 时,一切正常,通过 web 服务我遇到了这个问题,奇怪的是,即使 web 服务将作业提交给同一个 spark 提交。
package com.pz.loadtocassandra;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.ConsoleHandler;
import java.util.logging.FileHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.pz.shared.UnicodeBOMInputStream;
import com.pz.shared.fileformat.Header;
import com.pz.shared.mr.fileformat.MRFileFormats.CSVInputFormat;
import com.pz.shared.mr.fileformat.MRFileFormats.TextArrayWritable;
public class LoadToCassandra {
public static final String STUDYID = "STUDYID";
public static final String PROJECTNAME = "PROJECTNAME";
public static final String FILEID = "FILEID";
public static int count = 0;
public static final String FILE_SERPERATOR = "/";
public static Logger log = Logger.getLogger(LoadToCassandra.class.getName());
public static void main(String[] args) {
String propFileLoc = args[0];
String hdfsHome = args[1];
String hdfs_DtdXmlPath = args[2];
String hdfs_NormalizedDataPath = args[3];
run(propFileLoc, hdfsHome, hdfs_DtdXmlPath,hdfs_NormalizedDataPath);
} catch (IOException exception) {
log.log(Level.SEVERE, "Error occur in FileHandler.", exception);
public static void run(String propFileLoc, String hdfsHome,
String hdfs_DtdXmlPath, String hdfs_NormalizedDataPath) {
JavaSparkContext ctx = null;
FileSystem hadoopFs = null;
try {
//setting spark context
ctx = setSparkContext(propFileLoc);
ParseDtdXml.parseDTDXML(hdfsHome, hdfs_DtdXmlPath);
Configuration configuration = setHadoopConf();
hadoopFs = getHadoopFs(hdfsHome, configuration);
FileStatus[] fstat = hadoopFs.listStatus(new Path(hdfs_NormalizedDataPath));
//Getting the csv paths
Path[] paths = FileUtil.stat2Paths(fstat);
log.info("PATH.size - " + paths.length);
for (Path path : paths) {
log.info("path is : "+path.toString());
loadToCassandra(propFileLoc, path, configuration,hdfsHome, ctx);
} catch (IOException | URISyntaxException e) {
log.log(Level.SEVERE, "run method", e);
} finally {
log.info("finally ");
if (ctx!= null) {
System.out.println("SC Stopped");
if (hadoopFs != null) {
try {
} catch (IOException e) {
log.log(Level.SEVERE, "run method", e);
// input : 1. String hdfs home ,
// 2. Configuration hadoop conf object
// returns : hadoop File System object
private static FileSystem getHadoopFs(String hdfsHome,
Configuration configuration) throws IOException, URISyntaxException {
return FileSystem.get(new URI(hdfsHome), configuration);
// input : no inputs
// process : sets hadoop config parameters
// output : retuns hadoop conf object
private static Configuration setHadoopConf() throws IOException,
URISyntaxException {
Configuration configuration = new Configuration();
configuration.setBoolean("csvFileFormat.encoded.flag", true);
configuration.set("csvinputformat.token.delimiter", ",");
return configuration;
// input : string Properties File Location
// process : creates and sets the configurations of spark context
// retuns : JavaSparkContext object with configurations set to it.
private static JavaSparkContext setSparkContext(String propFileLoc) {
SparkConf conf = new SparkConf();
conf.setAppName("Loading Data");
return new JavaSparkContext(conf);
private static void loadToCassandra(String propFileLoc, Path sourceFileHdfsPath,
Configuration hadoopConf, String hdfsHome,JavaSparkContext ctx) {
System.out.println("File :: " + sourceFileHdfsPath.toString());
FileSystem hadoopFs = null;
String cassKeyspaceName = PropInitialize.cass_keyspace_name;
log.info("entered here for file "+sourceFileHdfsPath.toString());
final String strInputFileName = StringUtils.split(
sourceFileHdfsPath.getName(), "#")[0].toLowerCase();
final String strTableNameInCass = StringUtils.split(
sourceFileHdfsPath.getName(), "-")[0].split("#")[1]
final String strSourceFilePath = sourceFileHdfsPath.toString();
try {
hadoopFs = getHadoopFs(hdfsHome, hadoopConf);
//getting the cassandra connection using spark conf
final CassandraConnector connector = getCassandraConnection(ctx);
final JavaRDD<CassandraRow> cassTableObj=getCassTableObj(ctx,cassKeyspaceName,strTableNameInCass);
final Map<String, List<String>> tabColMapWithColTypes1 = ParseDtdXml.tabColMapWithColTypes;
final String headersUpdated;
final String headers;
UnicodeBOMInputStream ubis = new UnicodeBOMInputStream(
Header CsvHeader = Header.getCSVHeader(ubis, ",");
if (!strTableNameInCass.equalsIgnoreCase("PCMASTER")) {
String fString = "";
for (int i = 0; i < CsvHeader.size() - 1; i++) {
fString = fString + CsvHeader.get(i).ColumnName + ",";
fString = fString
+ CsvHeader.get(CsvHeader.size() - 1).ColumnName;
headers = fString; // StringUtils.join(stringArr.toString(),",");
headersUpdated = strTableNameInCass.toUpperCase() + "ID,"
+ headers;
} else {
String[] stringArr = new String[CsvHeader.size()];
String fString = "";
for (int i = 0; i < CsvHeader.size() - 1; i++) {
// stringArr[i] = CsvHeader.get(i).ColumnName;
fString = fString + CsvHeader.get(i).ColumnName + ",";
fString = fString
+ CsvHeader.get(CsvHeader.size() - 1).ColumnName;
headers = StringUtils.join(stringArr.toString(), ",");
headersUpdated = fString;
//Reading the file using spark context
JavaPairRDD<LongWritable, TextArrayWritable> fileRdd = ctx
.newAPIHadoopFile(strSourceFilePath, CSVInputFormat.class,
LongWritable.class, TextArrayWritable.class,
final long recCount = fileRdd.count();
final String[] actCols = headersUpdated.split(",");
final LinkedHashMap<Object, String> mapOfColNameAndType = new LinkedHashMap<Object, String>();
final List<String> colNameAndType = tabColMapWithColTypes1
for (int i = 0; i < actCols.length; i++) {
if (colNameAndType.contains(actCols[i] + " " + "text")) {
int indexOfColName = colNameAndType.indexOf(actCols[i]
+ " " + "text");
colNameAndType.get(indexOfColName).split(" ")[1]);
} else if (colNameAndType
.contains(actCols[i] + " " + "decimal")) {
int indexOfColName = colNameAndType.indexOf(actCols[i]
+ " " + "decimal");
colNameAndType.get(indexOfColName).split(" ")[1]);
} else {
//creates the query for prepared statement
final String makeStatement = makeSt(cassKeyspaceName,
strTableNameInCass, actCols);
final long seqId1 = cassTableObj.count();
//calling map on the fileRdd
JavaRDD<String> data = fileRdd.values().map(
new Function<TextArrayWritable, String>() {
private static final long serialVersionUID = 1L;
Session session;
boolean isssession = false;
PreparedStatement statement;
BatchStatement batch;
int lineCount = 0;
long seqId = seqId1;
/*for each line returned as an TextArrayWritable convert each cell the corresponding
* bind the data to prepared statement
* add it to batch
public String call(TextArrayWritable tup)
throws Exception {
log.info("entered here 3 for file "+strSourceFilePath.toString());
String[] part = tup.toStrings();
Object[] parts = getDataWithUniqueId(
strTableNameInCass, part);
//For each file
//Creates the session
//creates the PreparedStatement
if (!isssession) {
session = connector.openSession();
statement = session.prepare(makeStatement);
log.info("entered here 4 for file "+strSourceFilePath.toString());
// System.out.println("statement :" +
// statement);
isssession = true;
batch = new BatchStatement();
List<Object> typeConvData = new ArrayList<Object>();
for (int i = 0; i < parts.length; i++) {
String type = mapOfColNameAndType.get(i);
try {
if (type.equalsIgnoreCase("text")) {
} else {
// parts[i] =
// parts[i].toString().replace("\"",
// "");
// check if the String the has to
// converted to a BigDecimal is any
// positive or negetive integer or not.
// if its not a positive integer or
// negative forcefully convert it to
// zero (avoiding NumberFormatException)
if (!((String) parts[i])
.matches("-?\\d+")) {
parts[i] = "0";
long s = Long
.valueOf((String) parts[i]);
} catch (NullPointerException e) {
log.log(Level.SEVERE, "loadToCass method", e);
} catch (NumberFormatException e) {
log.log(Level.SEVERE, "loadToCass method", e);
} catch (InvalidTypeException e) {
log.log(Level.SEVERE, "loadToCass method", e);
List<Object> data = typeConvData;
//bind data to query
final BoundStatement query = statement.bind(data
.toArray(new Object[data.size()]));
//add query to batch
int count = LoadToCassandra.count;
//when count is 1k execute batch
if (count == 1000) {
log.info("entered here 5 for file "+strSourceFilePath.toString());
log.info("batch done");
LoadToCassandra.count = 0;
batch = new BatchStatement();
return StringUtils.join(tup.toStrings());
//if its the last batch and its not of size 1k
if (lineCount == (recCount))
log.info("Last Batch");
log.info("entered here 6 for file "+strSourceFilePath.toString());
log.info("Session closed");
return StringUtils.join(tup.toStrings());
private Object[] getDataWithUniqueId(
String strTableNameInCass, String[] part) {
Object[] parts = null;
ArrayList<String> tempArraylist = new ArrayList<String>();
if (!strTableNameInCass
.equalsIgnoreCase("PCMASTER")) {
for (int i = 0; i < part.length; i++) {
if (i == 0) {
parts = tempArraylist.toArray();
} else {
parts = part;
return parts;
} catch (Exception e) {
private static JavaRDD<CassandraRow> getCassTableObj(
JavaSparkContext ctx, String cassKeyspaceName,
String strTableNameInCass) {
return javaFunctions(ctx)
private static CassandraConnector getCassandraConnection(
JavaSparkContext ctx) {
return CassandraConnector.apply(ctx.getConf());
private static String makeSt(String keyspace, String tabName,
String[] colNames) {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO " + keyspace + "." + tabName + " ( ");
List<String> vars = new ArrayList<>();
for (int i = 0; i < (colNames.length - 1); i++) {
sb.append(colNames[i] + ",");
sb.append(colNames[colNames.length - 1] + " ) values ( "
+ StringUtils.join(vars, ",") + " ) ");
return sb.toString();