我正在尝试执行以下操作。
通过 jdbc 从数据库中读取数据。在 pyspark 中,我可以使用以下语法来做到这一点。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
val finalDataFrame = sparknew.read.format("jdbc").option("url", "").option("dbtable", "").option("driver", "oracle.jdbc.OracleDriver").option("user", "").option("password", "").load()
现在我想在SCALA中做类似的事情。
我尝试了以下方法:
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.DataFrame
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
// @params: [JOB_NAME]
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val sparknew = glueContext.getSparkSession
val df_fileInterfacemap = sparknew.read.format("jdbc").option("url", "<url>").option("dbtable", "tablename").option("nullValue", "null").option("user", "<password>").option("password", "<password>").load()
val dynamicframe_new = DynamicFrame(df_fileInterfacemap, glueContext)
val datasink2 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://<S3 Path>"}"""), transformationContext = "datasink2", format = "csv").writeDynamicFrame(dynamicframe_new)
Job.commit()
}
}
但我收到以下错误。
java.sql.SQLRecoverableException IO 错误:网络适配器无法建立连接
我不想使用胶水 api 提供的爬虫而不是 getSource 方法。任何帮助,将不胜感激。