2

我是 Spark 世界和 Job Server 的新手

我的代码:

package spark.jobserver 

import java.nio.ByteBuffer 

import scala.collection.JavaConversions._ 
import scala.collection.mutable.ListBuffer 
import scala.collection.immutable.Map 

import org.apache.cassandra.hadoop.ConfigHelper 
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper 
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat 
import org.apache.cassandra.utils.ByteBufferUtil 
import org.apache.hadoop.mapreduce.Job 

import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import scala.util.Try 

object CassandraCQLTest extends SparkJob{ 

  def main(args: Array[String]) {   
    val sc = new SparkContext("local[4]", "CassandraCQLTest") 
    sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar"); 
    val config = ConfigFactory.parseString("") 
    val results = runJob(sc, config) 
    println("Result is " + "test") 
  } 

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = { 
    Try(config.getString("input.string")) 
      .map(x => SparkJobValid) 
      .getOrElse(SparkJobInvalid("No input.string config param")) 
  } 

  override def runJob(sc: SparkContext, config: Config): Any = { 
    val cHost: String = "localhost" 
    val cPort: String = "9160" 
    val KeySpace = "retail" 
    val InputColumnFamily = "ordercf" 
    val OutputColumnFamily = "salecount" 

    val job = new Job() 
    job.setInputFormatClass(classOf[CqlPagingInputFormat]) 
    ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) 
    ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) 
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) 
    ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") 
    CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3") 

    /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */ 

    /** An UPDATE writes one or more columns to a record in a Cassandra column family */ 
    val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " 
    CqlConfigHelper.setOutputCql(job.getConfiguration(), query) 

    job.setOutputFormatClass(classOf[CqlOutputFormat]) 
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) 
    ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) 
    ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) 
    ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") 

    val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), 
      classOf[CqlPagingInputFormat], 
      classOf[java.util.Map[String,ByteBuffer]], 
      classOf[java.util.Map[String,ByteBuffer]]) 


    val productSaleRDD = casRdd.map { 
      case (key, value) => { 
        (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity"))) 
      } 
    } 
    val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) 
    aggregatedRDD.collect().foreach { 
      case (productId, saleCount) => println(productId + ":" + saleCount) 
    } 

    val casoutputCF  = aggregatedRDD.map { 
      case (productId, saleCount) => { 
        val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId)) 
        val outKey: java.util.Map[String, ByteBuffer] = outColFamKey 
        var outColFamVal = new ListBuffer[ByteBuffer] 
        outColFamVal += ByteBufferUtil.bytes(saleCount) 
        val outVal: java.util.List[ByteBuffer] = outColFamVal 
       (outKey, outVal) 
      } 
    } 

    casoutputCF.saveAsNewAPIHadoopFile( 
        KeySpace, 
        classOf[java.util.Map[String, ByteBuffer]], 
        classOf[java.util.List[ByteBuffer]], 
        classOf[CqlOutputFormat], 
        job.getConfiguration() 
      ) 
    casRdd.count 
  } 
} 

当我使用 spark-jobServer 推送 Jar 并执行它时,我在 spark-jobserver 终端上得到了这个

job-server[ERROR] Exception in thread "pool-1-thread-1" java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat 
job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46) 
job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21) 
job-server[ERROR] at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235) 
job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
job-server[ERROR] at java.lang.Thread.run(Thread.java:745) 
job-server[ERROR] Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
job-server[ERROR] at java.security.AccessController.doPrivileged(Native Method) 
job-server[ERROR] at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
job-server[ERROR] ... 8 more 

我已经将 $EXTRA_JAR 变量添加到我的 cassandra-spark-connector-assembly。

4

2 回答 2

0

将程序提交到 Spark 时,需要包含所有依赖的 jar 文件(逗号分隔列表)。假设您的项目结构如下。

 simpleapp
  - src/main/java
    - org.apache.spark.examples
      - SimpleApp.java
  - lib
    - dependent.jars (you can put all dependent jars inside lib   directory)
  - target
    - simpleapp.jar (after compiling your source)

所以你可以使用下面的命令。

spark-submit --jars $(echo lib/*.jar | tr ' ' ',' ) --class org.apache.spark.examples.SimpleApp --master local[2]  target/simpleapp.jar

此外,您可以使用 spark Web 控制台查看 jar 分发,转到您的程序 -> 环境并找出 spark 抱怨的 jar 文件是否已经存在。

于 2015-04-25T01:25:39.120 回答
0

CqlPagingInputFormat 在 cassandra-all 版本 2.0.4 中,在其他版本中找不到。在运行时,您的应用程序正在使用大于 2.0.4 的 cassandra 版本。您必须将此依赖项添加到您的 pom 中:

<dependency>
  <groupId>org.apache.cassandra</groupId>
  <artifactId>cassandra-all</artifactId>   
  <version>2.0.4</version>

得到这门课。

但我不保证其他事情会正常工作。

于 2016-04-26T14:14:15.957 回答