5

我正在尝试将 Spark 与亚马逊 Redshift 连接,但出现此错误:

在此处输入图像描述

我的代码如下:

from pyspark.sql import SQLContext
from pyspark import SparkContext

sc = SparkContext(appName="Connect Spark with Redshift")
sql_context = SQLContext(sc)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", <ACCESSID>)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", <ACCESSKEY>)

df = sql_context.read \
    .option("url", "jdbc:redshift://example.coyf2i236wts.eu-central-    1.redshift.amazonaws.com:5439/agcdb?user=user&password=pwd") \
    .option("dbtable", "table_name") \
    .option("tempdir", "bucket") \
    .load()
4

6 回答 6

11

这是连接到 redshift 的分步过程。

  • 下载红移连接器文件。试试下面的命令
wget "https://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC4-1.2.1.1001.jar"
  • 将以下代码保存在 python 文件(您要运行的.py)中并相应地替换凭据。
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

#initialize the spark session 
spark = SparkSession.builder.master("yarn").appName("Connect to redshift").enableHiveSupport().getOrCreate()
sc = spark.sparkContext
sqlContext = HiveContext(sc)

sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "<ACCESSKEYID>")
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "<ACCESSKEYSECTRET>")


taxonomyDf = sqlContext.read \
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:postgresql://url.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") \
    .option("dbtable", "table_name") \
    .option("tempdir", "s3://mybucket/") \
    .load() 
  • 像下面一样运行 spark-submit
spark-submit --packages com.databricks:spark-redshift_2.10:0.5.0 --jars RedshiftJDBC4-1.2.1.1001.jar test.py
于 2017-01-06T10:01:44.780 回答
3

如果您使用 Spark 2.0.4 并在 AWS EMR 集群上运行您的代码,请按照以下步骤操作:-

1) 使用以下命令下载 Redshift JDBC jar:-

wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar

参考:- AWS 文档

2) 将下面提到的代码复制到 python 文件中,然后用您的 AWS 资源替换所需的值:-

import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "access key")
spark._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "secret access key")

sqlCon = SQLContext(spark)
df = sqlCon.createDataFrame([
    (1, "A", "X1"),
    (2, "B", "X2"),
    (3, "B", "X3"),
    (1, "B", "X3"),
    (2, "C", "X2"),
    (3, "C", "X2"),
    (1, "C", "X1"),
    (1, "B", "X1"),
], ["ID", "TYPE", "CODE"])

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://HOST_URL:5439/DATABASE_NAME?user=USERID&password=PASSWORD") \
  .option("dbtable", "TABLE_NAME") \
  .option("aws_region", "us-west-1") \
  .option("tempdir", "s3://BUCKET_NAME/PATH/") \
  .mode("error") \
  .save()

3) 运行以下 spark-submit 命令:-

spark-submit --name "App Name" --jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar --packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 --py-files python_script.py python_script.py

笔记:-

1)在Reshift集群的安全组的入站规则中应该允许EMR节点的Public IP地址(spark-submit作业将在该节点上运行)。

2) Redshift 集群和“tempdir”下使用的 S3 位置应该在同一个地理位置。在上面的示例中,这两个资源都在 us-west-1 中。

3)如果数据是敏感的,那么请确保保护所有通道。为了确保连接安全,请按照配置中提到的 步骤进行操作

于 2019-08-20T14:34:32.357 回答
1

该错误是由于缺少依赖项。

验证您在 spark 主目录中是否有这些 jar 文件:

  1. spark-redshift_2.10-3.0.0-preview1.jar
  2. RedshiftJDBC41-1.1.10.1010.jar
  3. hadoop-aws-2.7.1.jar
  4. aws-java-sdk-1.7.4.jar
  5. (aws-java-sdk-s3-1.11.60.jar)

将这些 jar 文件放在 $SPARK_HOME/jars/ 中,然后启动 spark

pyspark --jars $SPARK_HOME/jars/spark-redshift_2.10-3.0.0-preview1.jar,$SPARK_HOME/jars/RedshiftJDBC41-1.1.10.1010.jar,$SPARK_HOME/jars/hadoop-aws-2.7.1.jar,$SPARK_HOME/jars/aws-java-sdk-s3-1.11.60.jar,$SPARK_HOME/jars/aws-java-sdk-1.7.4.jar

(SPARK_HOME 应该是 = "/usr/local/Cellar/apache-spark/$SPARK_VERSION/libexec")

这将运行带有所有必要依赖项的 Spark。请注意,如果您使用 awsAccessKeys,您还需要指定身份验证类型 'forward_spark_s3_credentials'=True。

from pyspark.sql import SQLContext
from pyspark import SparkContext

sc = SparkContext(appName="Connect Spark with Redshift")
sql_context = SQLContext(sc)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", <ACCESSID>)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", <ACCESSKEY>)

df = sql_context.read \
     .format("com.databricks.spark.redshift") \
     .option("url", "jdbc:redshift://example.coyf2i236wts.eu-central-    1.redshift.amazonaws.com:5439/agcdb?user=user&password=pwd") \
     .option("dbtable", "table_name") \
     .option('forward_spark_s3_credentials',True) \
     .option("tempdir", "s3n://bucket") \
     .load()

之后的常见错误是:

  • Redshift 连接错误:“SSL 关闭”
    • 解决方案: .option("url", "jdbc:redshift://example.coyf2i236wts.eu-central- 1.redshift.amazonaws.com:5439/agcdb?user=user&password=pwd?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory")
  • S3 错误:卸载数据时,例如在 df.show() 之后,您会收到消息:“您尝试访问的存储桶必须使用指定的端点进行寻址。请将所有未来的请求发送到此端点。”
    • 解决方案:bucket和cluster必须在同一个region内运行
于 2016-11-26T18:13:37.737 回答
0

如果您使用的是数据块,我认为您不必创建新的 sql 上下文,因为他们这样做是因为您只需使用 sqlContext,请尝试使用以下代码:

from pyspark.sql import SQLContext
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")

df = sqlContext.read \ .......

也许桶没有安装

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
于 2016-07-26T19:12:39.747 回答
-1

我认为您需要添加.format("com.databricks.spark.redshift")到您的sql_context.read通话中;我的直觉是 Spark 无法推断此数据源的格式,因此您需要明确指定我们应该使用spark-redshift连接器。

有关此错误的更多详细信息,请参阅https://github.com/databricks/spark-redshift/issues/230

于 2016-10-18T21:26:09.620 回答
-1

我认为s3n://URL 样式已被弃用和/或删除。

尝试将您的密钥定义为"fs.s3.awsAccessKeyId".

于 2016-07-11T17:41:18.890 回答