1

我正在尝试了解如何将 Spark 作业提交给 Apache Livy。

我在我的 POM.xml 中添加了以下 API:

 <dependency>
     <groupId>com.cloudera.livy</groupId>
     <artifactId>livy-api</artifactId>
     <version>0.3.0</version>
 </dependency>

 <dependency>
     <groupId>com.cloudera.livy</groupId>
     <artifactId>livy-scala-api_2.11</artifactId>
     <version>0.3.0</version>
 </dependency>

然后我在 Spark 中有以下代码,我想根据请求提交给 Livy。

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object Test {

  def main(args: Array[String]) {

    val spark = SparkSession.builder()
                            .appName("Test")
                            .master("local[*]")
                            .getOrCreate()


    import spark.sqlContext.implicits._

    implicit val sparkContext = spark.sparkContext

    // ...
  }
}

使用以下代码创建LivyClient实例并将应用程序代码上传到 Spark 上下文:

val client = new LivyClientBuilder()
  .setURI(new URI(livyUrl))
  .build()

try {
  client.uploadJar(new File(testJarPath)).get()

  client.submit(new Test())

} finally {
  client.stop(true)
}

但是,问题在于代码Test不适合与 Apache Livy 一起使用。

如何调整Test对象的代码以便能够运行client.submit(new Test())

4

1 回答 1

3

你的Test类需要实现 Livy 的Job接口并且你需要call在你的类中实现它的方法Test,从那里你可以访问 jobContext/SparkContext。然后,您可以Testsubmit方法中传递实例

您不必自己创建 SparkSession,Livy 将在集群上创建它,您可以在您的call方法中访问该上下文。

您可以在此处找到有关 Livy 编程 API 的更多详细信息:https ://livy.incubator.apache.org/docs/latest/programmatic-api.html

这是测试类的示例实现:

import com.cloudera.livy.{Job, JobContext}

class Test extends Job[Int]{

  override def call(jc: JobContext): Int = {

    val spark = jc.sparkSession()

    // Do anything with SparkSession

    1 //Return value
  }
}
于 2018-03-11T14:06:01.077 回答