如果您使用 Spark 2.0.4 并在 AWS EMR 集群上运行您的代码,请按照以下步骤操作:-
1) 使用以下命令下载 Redshift JDBC jar:-
wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar
参考:- AWS 文档
2) 将下面提到的代码复制到 python 文件中,然后用您的 AWS 资源替换所需的值:-
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "access key")
spark._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "secret access key")
sqlCon = SQLContext(spark)
df = sqlCon.createDataFrame([
(1, "A", "X1"),
(2, "B", "X2"),
(3, "B", "X3"),
(1, "B", "X3"),
(2, "C", "X2"),
(3, "C", "X2"),
(1, "C", "X1"),
(1, "B", "X1"),
], ["ID", "TYPE", "CODE"])
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://HOST_URL:5439/DATABASE_NAME?user=USERID&password=PASSWORD") \
.option("dbtable", "TABLE_NAME") \
.option("aws_region", "us-west-1") \
.option("tempdir", "s3://BUCKET_NAME/PATH/") \
.mode("error") \
.save()
3) 运行以下 spark-submit 命令:-
spark-submit --name "App Name" --jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar --packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 --py-files python_script.py python_script.py
笔记:-
1)在Reshift集群的安全组的入站规则中应该允许EMR节点的Public IP地址(spark-submit作业将在该节点上运行)。
2) Redshift 集群和“tempdir”下使用的 S3 位置应该在同一个地理位置。在上面的示例中,这两个资源都在 us-west-1 中。
3)如果数据是敏感的,那么请确保保护所有通道。为了确保连接安全,请按照配置中提到的
步骤进行操作。