1

解析字段(eg-master:String="") 有效,但如果这是在另一个案例类中,比如下面给出的案例类 Job(SparkArgs),我需要解析 JobArgs


case class SparkArgs(master: String)

val parser = new scopt.OptionParser[SparkArgs]("testing") {
    head("spark-example", "2.11")

    opt[String]('c', "master").required().valueName("spark-master").
      action((x, c) => c.copy(master = x)).
      text("Setting master is required")
  }

  parser.parse(args, SparkArgs()) match {

    case Some(config) =>
    // do stuff
      println(config.master)
    case None => // failed

  }
//I am able to parse master above by >> run --master=local[2]

//Now how to parse if there are case class as parameters instead of String and Int and also those fields needs to be parsed,say scopt.OptionParser[JobArgs]
//eg -

 case class JobArgs(sparkArgs: SparkArgs, kafkaArgs: KafkaArgs)
 case class KafkaArgs(
                        kafkaPORT: String="",
                        checkpointPath: String="src/main/resources/checkpoints"
                      )
  case class SparkArgs(master: String = "")


//I tried-
val parser = new scopt.OptionParser[JobArgs]("testing") {
    head("spark-example", "2.11")

   //Now how to parse all those fields which are master and kafkaPORT here
  }

//and run similarly as>> run --master=local[2] --kafkaPORT=localhost:9092 
4

1 回答 1

2

怎么样:

case class JobArgs(sparkArgs: SparkArgs = SparkArgs(), kafkaArgs: KafkaArgs = KafkaArgs())

case class KafkaArgs(
                      kafkaPORT: String = "",
                      checkpointPath: String = "src/main/resources/checkpoints"
                    )

case class SparkArgs(master: String = "")

object StackoverFlow {
  def main(args: Array[String]): Unit = {

    val parser = new scopt.OptionParser[JobArgs]("testing") {
      head("spark-example", "2.11")

      opt[String]("master")
        .required()
        .valueName("spark-master")
        .action((master, c) => c.copy(sparkArgs = SparkArgs(master)))
        .text("All necessary benchmark topics get created.")

      opt[String]("kafkaPort")
        .required()
        .valueName("kafka-port")
        .action((kafkaPort, c) => c.copy(kafkaArgs = KafkaArgs(kafkaPort)))
    }

    parser.parse(args, JobArgs()) match {
      case Some(c) => println(c)
      case _ => sys.exit(1)
    }
  }
}

使用参数执行它--master FCM --kafkaPort 1965会导致以下输出:

JobArgs(SparkArgs(FCM),KafkaArgs(1965,src/main/resources/checkpoints))

如果您还想通过checkpointPath,您可以通过以下方式实现:

case class JobArgs(sparkArgs: SparkArgs = SparkArgs(), kafkaArgs: KafkaArgs = KafkaArgs())

case class KafkaArgs(
                      kafkaPORT: String = "",
                      checkpointPath: String = "src/main/resources/checkpoints"
                    )

case class SparkArgs(master: String = "")

object StackoverFlow {
  def main(args: Array[String]): Unit = {

    val parser = new scopt.OptionParser[JobArgs]("testing") {
      head("spark-example", "2.11")

      opt[String]("master")
        .required()
        .valueName("spark-master")
        .action((master, c) => c.copy(sparkArgs = SparkArgs(master)))
        .text("All necessary benchmark topics get created.")

      opt[Unit]("kafka")
        .action((kafkaPort, c) => c.copy(kafkaArgs = KafkaArgs()))
        .children(
          opt[String]('p', "port")
            .required()
            .action((x, c) => c.copy(kafkaArgs = c.kafkaArgs.copy(kafkaPORT = x))),
          opt[String]('c', "checkpointPath")
            .required()
            .action((x, c) => c.copy(kafkaArgs = c.kafkaArgs.copy(checkpointPath = x)))
        )
    }

    parser.parse(args, JobArgs()) match {
      case Some(c) => println(c)
      case _ => sys.exit(1)
    }
  }
}

相应地,使用以下参数执行它:

--master FC --kafka --port 1965 --checkpointPath Mag/de/burg

导致以下输出: JobArgs(SparkArgs(FC),KafkaArgs(1965,Mag/de/burg))

于 2019-08-21T12:01:49.263 回答