4

我正在尝试通过 AWS Glue(在 GlueStudio 的可视模式下)将数据目录表 (MySQL) 中的数据移动到雪花表中。

为此,我正在遵循本指南使用雪花和 AWS Glue 执行数据转换[1]

我正在关注它的每一部分,但是在执行我的工作时,我得到了错误

An error occurred while calling xxx.pyWriteDynamicFrame. No suitable driver

(是的,除了堆栈跟踪,错误消息中没有更多信息)我已经测试了我能想到的所有内容,例如:

  • 可以访问驱动程序的 s3 存储桶
  • 有网络访问权限
  • 我尝试了一个不完整的 JDBC ULR(没有密码),错误是这样说的
  • 我试过输入错误的密码,但我得到了相应的错误

我发现的一件事是,我发现的很多问题都在 AWS Glue 上作为脚本报告(不适用于可视化编辑器),并且在其中许多问题中,它们使用两个 jar:thesnowflake JDBC driverSnowflake Spark Connector. 尽管我遵循的教程 [1] 不是那么清楚,但我尝试将这两个文件都放在我的“驱动程序”存储桶中,但仍然是同样的错误。

我已经尝试了这两个文件的许多版本,但均无济于事。

所以我不知道那可能是什么(我什至尝试使用不同的 AWS 账户和不同的 Snowflake 账户,这样我就可以完全访问资源)

你们中有人尝试过这种设置吗?

我在用着:

  • AWS GlueStudio(2021 年 6 月)
  • 雪花云版本 5.22.1(select CURRENT_VERSION();在雪花中获得)
  • Snowflake JDBC驱动v3.13.4
  • 用于 scala v2.12 的雪花火花连接器 v2.9.0-spark_2.4
  • 我的连接字符串:jdbc:snowflake://xxx00000.snowflakecomputing.com/?user=${Username}&password=${Password}&warehouse=${wh}&db=${db}&schema=${schema}

[编辑 6 月 23 日] 至于@jonlegend 评论:我正在使用可视化编辑器来完成这项工作。所以我无法控制代码实现。不过,我将发布生成的代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "mydb_name", table_name = "mytable_name", transformation_ctx = "DataSource0"]
## @return: DataSource0
## @inputs: []
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb_name", table_name = "mytable_name", transformation_ctx = "DataSource0")
## @type: ApplyMapping
## @args: [mappings = [("account_number", "string", "account_number", "string"), ("user_id", "int", "user_id", "int"), ("description", "string", "description", "string"), ("id", "int", "id", "int"), ("group_account_id", "int", "group_account_id", "int"), ("updated", "timestamp", "updated", "timestamp")], transformation_ctx = "Transform0"]
## @return: Transform0
## @inputs: [frame = DataSource0]
Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("account_number", "string", "account_number", "string"), ("user_id", "int", "user_id", "int"), ("description", "string", "description", "string"), ("id", "int", "id", "int"), ("group_account_id", "int", "group_account_id", "int"), ("updated", "timestamp", "updated", "timestamp")], transformation_ctx = "Transform0")
## @type: DataSink
## @args: [connection_type = "custom.jdbc", connection_options = {"dbTable":"myschema.myTargetTable","connectionName":"snowflake-connection-v7"}, transformation_ctx = "DataSink0"]
## @return: DataSink0
## @inputs: [frame = Transform0]
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "custom.jdbc", connection_options = {"dbTable":"myschema.myTargetTable","connectionName":"snowflake-connection-v7"}, transformation_ctx = "DataSink0")
job.commit()

另外,关于堆栈跟踪,我也可以分享它:

ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
  File "/tmp/test_job_v7.py", line 30, in <module>
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "custom.jdbc", connection_options = 
{
    "dbTable": "myschema.myTargetTable",
    "connectionName": "snowflake-connection-v7"
}
, transformation_ctx = "DataSink0")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 653, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 281, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 304, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o104.pyWriteDynamicFrame.
: java.sql.SQLException: No suitable driver
    at java.sql.DriverManager.getDriver(DriverManager.java:315)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:105)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:105)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:104)
    at org.apache.spark.sql.jdbc.glue.GlueJDBCOptions.<init>(GlueJDBCOptions.scala:14)
    at org.apache.spark.sql.jdbc.glue.GlueJDBCOptions.<init>(GlueJDBCOptions.scala:17)
    at com.amazonaws.services.glue.marketplace.partner.PartnerJDBCRecordWriterFactory.<init>(PartnerJDBCDataSink.scala:78)
    at com.amazonaws.services.glue.marketplace.partner.PartnerJDBCDataSink.createWriterFactory(PartnerJDBCDataSink.scala:32)
    at com.amazonaws.services.glue.marketplace.partner.PartnerJDBCDataSink.createWriterFactory(PartnerJDBCDataSink.scala:23)
    at com.amazonaws.services.glue.marketplace.connector.GlueCustomDataSink.defaultWriteDynamicFrame(CustomDataSink.scala:68)
    at com.amazonaws.services.glue.marketplace.connector.GlueCustomDataSink.writeDynamicFrame(CustomDataSink.scala:61)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:65)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

最后,我正在查看错误日志和作业日志,错误是相同的。两个日志中的先前消息也无济于事。

4

1 回答 1

0

一些建议:
确保您的代码中引用了您的 JDBC 驱动程序(我不确定如何在可视化编辑器中执行此操作,但在代码中,更改以下行:

DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "custom.jdbc", connection_options = {"dbTable":"myschema.myTargetTable","connectionName":"snowflake-connection-v7"}, transformation_ctx = "DataSink0")

至:

DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "custom.jdbc", connection_options = { 
  "dbTable":"myschema.myTargetTable",
  "connectionName":"snowflake-connection-v7",
  "customJdbcDriverS3Path": "Amazon S3 path of the custom JDBC driver",
  "customJdbcDriverClassName":"class name of the driver"
}, transformation_ctx = "DataSink0")

还要确保您的胶水作业对驱动程序所在的 S3 文件夹具有 iam 权限。如果您使用默认服务角色/AWSGlueServiceRole,只需确保字符串“aws-glue-”出现在 s3 路径中的某处,例如“S3://somebucket/aws-glue-drivers/mydriver.jar”

于 2021-06-25T16:44:40.070 回答