这是我使用 spark cassandra 连接器简单地读取列族的代码
import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
import com.datastax.spark.connector.japi.SparkContextJavaFunctions;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
public class Main {
private static final String HOST = "spark://sparkmaster:7077";
// private static final String HOST = "local[4]";
private static final String APP_NAME = "Cassandra Spark WordCount";
public static void main (String... args) {
String[] jars = {
"./build/libs/CassandraSparkMapReduce-1.0-SNAPSHOT.jar"
};
SparkConf conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "107.108.214.154")
.set("spark.executor.userClassPathFirst", "true")
.setJars(jars);
SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);
JavaRDD<String> rdd = context.cassandraTable("wordcount", "input")
.map(row -> row.toString());
System.out.println(rdd.toArray());
}
}
这是我build.gradle
构建和运行应用程序的文件
group 'in.suyash.tests'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.4.0'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.10', version: '1.4.0-M1'
compile group: 'com.datastax.spark', name: 'spark-cassandra-connector-java_2.10', version: '1.4.0-M1'
testCompile group: 'junit', name: 'junit', version: '4.11'
}
sourceSets {
main {
java {
srcDir './'
}
}
}
mainClassName = 'Main'
// http://stackoverflow.com/a/14441628/3673043
jar {
doFirst {
from {
configurations.compile.collect {
it.isDirectory() ? it : zipTree(it)
}
}
}
exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA'
}
我首先通过构建 jar 来执行我的工作gradle build
,然后我执行gradle run
. 但是工作失败了,看着stderr
我的执行者,我得到了以下异常
java.lang.ClassCastException: org.apache.spark.scheduler.ResultTask cannot be cast to org.apache.spark.scheduler.Task
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我有一个 3 节点设置,其中一个节点充当 spark 主节点,而另外两个是 spark 工作节点,也形成一个 cassandra 环。如果我更改我的 spark 主机,我可以在本地执行该作业,但是在集群上我遇到了这个奇怪的异常,我在其他任何地方都找不到。版本:
- 火花:1.4.0
- 卡桑德拉:2.1.6
- 火花卡桑德拉连接器:1.4.0-M1
编辑
我无法准确回答我是如何解决这个问题的,但是我从所有节点中删除了所有 java 安装,重新启动所有内容并安装了新的副本jdk1.8.0_45
,再次启动我的集群,现在作业成功完成。欢迎对此行为进行任何解释。