1

我正在尝试执行以下操作。

通过 jdbc 从数据库中读取数据。在 pyspark 中,我可以使用以下语法来做到这一点。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
val finalDataFrame = sparknew.read.format("jdbc").option("url", "").option("dbtable", "").option("driver", "oracle.jdbc.OracleDriver").option("user", "").option("password", "").load()

现在我想在SCALA中做类似的事情。

我尝试了以下方法:

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.DataFrame


object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val sparknew = glueContext.getSparkSession

    val df_fileInterfacemap = sparknew.read.format("jdbc").option("url", "<url>").option("dbtable", "tablename").option("nullValue", "null").option("user", "<password>").option("password", "<password>").load()
    val dynamicframe_new = DynamicFrame(df_fileInterfacemap, glueContext)
    val datasink2 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://<S3 Path>"}"""), transformationContext = "datasink2", format = "csv").writeDynamicFrame(dynamicframe_new)
    Job.commit()
  }
}

但我收到以下错误。

java.sql.SQLRecoverableException IO 错误:网络适配器无法建立连接

我不想使用胶水 api 提供的爬虫而不是 getSource 方法。任何帮助,将不胜感激。

4

1 回答 1

3

我能够使用以下代码完成任务:

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val sparknew = glueContext.getSparkSession

    val props = new Properties()
    props.setProperty("driver", "oracle.jdbc.driver.OracleDriver")
    props.setProperty("user", "<USERNAME>")
    props.setProperty("password", "<PASSWORD>")
    props.setProperty("nullValue", "null")
    val url = "jdbc:oracle:thin:@HOSTNAME:PORT:INSTANCENAME"
    val table = "SCHEMA.TABLE"
    val df_fileInterfacemap = sparknew.read.jdbc(url, table, props)

希望这会对某人有所帮助。

于 2018-02-15T00:01:06.333 回答