我正在尝试通过 AWS Glue(在 GlueStudio 的可视模式下)将数据目录表 (MySQL) 中的数据移动到雪花表中。
为此,我正在遵循本指南使用雪花和 AWS Glue 执行数据转换[1]
我正在关注它的每一部分,但是在执行我的工作时,我得到了错误
An error occurred while calling xxx.pyWriteDynamicFrame. No suitable driver
(是的,除了堆栈跟踪,错误消息中没有更多信息)我已经测试了我能想到的所有内容,例如:
- 可以访问驱动程序的 s3 存储桶
- 有网络访问权限
- 我尝试了一个不完整的 JDBC ULR(没有密码),错误是这样说的
- 我试过输入错误的密码,但我得到了相应的错误
我发现的一件事是,我发现的很多问题都在 AWS Glue 上作为脚本报告(不适用于可视化编辑器),并且在其中许多问题中,它们使用两个 jar:thesnowflake JDBC driver
和Snowflake Spark Connector
. 尽管我遵循的教程 [1] 不是那么清楚,但我尝试将这两个文件都放在我的“驱动程序”存储桶中,但仍然是同样的错误。
我已经尝试了这两个文件的许多版本,但均无济于事。
所以我不知道那可能是什么(我什至尝试使用不同的 AWS 账户和不同的 Snowflake 账户,这样我就可以完全访问资源)
你们中有人尝试过这种设置吗?
我在用着:
- AWS GlueStudio(2021 年 6 月)
- 雪花云版本 5.22.1(
select CURRENT_VERSION();
在雪花中获得) - Snowflake JDBC驱动v3.13.4
- 用于 scala v2.12 的雪花火花连接器 v2.9.0-spark_2.4
- 我的连接字符串:
jdbc:snowflake://xxx00000.snowflakecomputing.com/?user=${Username}&password=${Password}&warehouse=${wh}&db=${db}&schema=${schema}
[编辑 6 月 23 日] 至于@jonlegend 评论:我正在使用可视化编辑器来完成这项工作。所以我无法控制代码实现。不过,我将发布生成的代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "mydb_name", table_name = "mytable_name", transformation_ctx = "DataSource0"]
## @return: DataSource0
## @inputs: []
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb_name", table_name = "mytable_name", transformation_ctx = "DataSource0")
## @type: ApplyMapping
## @args: [mappings = [("account_number", "string", "account_number", "string"), ("user_id", "int", "user_id", "int"), ("description", "string", "description", "string"), ("id", "int", "id", "int"), ("group_account_id", "int", "group_account_id", "int"), ("updated", "timestamp", "updated", "timestamp")], transformation_ctx = "Transform0"]
## @return: Transform0
## @inputs: [frame = DataSource0]
Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("account_number", "string", "account_number", "string"), ("user_id", "int", "user_id", "int"), ("description", "string", "description", "string"), ("id", "int", "id", "int"), ("group_account_id", "int", "group_account_id", "int"), ("updated", "timestamp", "updated", "timestamp")], transformation_ctx = "Transform0")
## @type: DataSink
## @args: [connection_type = "custom.jdbc", connection_options = {"dbTable":"myschema.myTargetTable","connectionName":"snowflake-connection-v7"}, transformation_ctx = "DataSink0"]
## @return: DataSink0
## @inputs: [frame = Transform0]
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "custom.jdbc", connection_options = {"dbTable":"myschema.myTargetTable","connectionName":"snowflake-connection-v7"}, transformation_ctx = "DataSink0")
job.commit()
另外,关于堆栈跟踪,我也可以分享它:
ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
File "/tmp/test_job_v7.py", line 30, in <module>
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "custom.jdbc", connection_options =
{
"dbTable": "myschema.myTargetTable",
"connectionName": "snowflake-connection-v7"
}
, transformation_ctx = "DataSink0")
File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 653, in from_options
format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 281, in write_dynamic_frame_from_options
format, format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 304, in write_from_options
return sink.write(frame_or_dfc)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o104.pyWriteDynamicFrame.
: java.sql.SQLException: No suitable driver
at java.sql.DriverManager.getDriver(DriverManager.java:315)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:105)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:105)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:104)
at org.apache.spark.sql.jdbc.glue.GlueJDBCOptions.<init>(GlueJDBCOptions.scala:14)
at org.apache.spark.sql.jdbc.glue.GlueJDBCOptions.<init>(GlueJDBCOptions.scala:17)
at com.amazonaws.services.glue.marketplace.partner.PartnerJDBCRecordWriterFactory.<init>(PartnerJDBCDataSink.scala:78)
at com.amazonaws.services.glue.marketplace.partner.PartnerJDBCDataSink.createWriterFactory(PartnerJDBCDataSink.scala:32)
at com.amazonaws.services.glue.marketplace.partner.PartnerJDBCDataSink.createWriterFactory(PartnerJDBCDataSink.scala:23)
at com.amazonaws.services.glue.marketplace.connector.GlueCustomDataSink.defaultWriteDynamicFrame(CustomDataSink.scala:68)
at com.amazonaws.services.glue.marketplace.connector.GlueCustomDataSink.writeDynamicFrame(CustomDataSink.scala:61)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
最后,我正在查看错误日志和作业日志,错误是相同的。两个日志中的先前消息也无济于事。