我一直在研究基于 pyspark 的内容拉/同步工具(类似于 sqoop,但应用了一些转换作为管道)。我正在通过以下方式创建 jdbc 连接:
datatype_boundry_mappings = {
# https://docs.microsoft.com/en-us/sql/t-sql/data-types/int-bigint-smallint-and-tinyint-transact-sql
# We are not not using negative numbers below 10 anywhere so our lower limit does not match the sqlserver range
'sqlserver': {
'tinyint': {
'lower': 0,
'upper': 255
},
'smallint': {
'lower': -10,
'upper': 32767
},
'int': {
'lower': -10,
'upper': 2147483647
},
'bigint': {
'lower': -10,
'upper': 9223372036854775807
}
}
}
(...)
jdbc_df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", table_name) \
.option("user", db_username) \
.option("password", db_password) \
.option("numPartitions", partitions) \
.option("partitionColumn", primary_key_column) \
.option("lowerBound", datatype_boundry_mappings['sqlserver'][primary_key_type]['lower']) \
.option("upperBound", datatype_boundry_mappings['sqlserver'][primary_key_type]['upper']) \
.option("selectMethod", 'direct') \
.load()
在 pyspark repl 中,我可以确认我有 4 个分区,但是我不知道如何判断每个分区的上/下份额是多少。
In [43]: delta_df.rdd.getNumPartitions()
Out[43]: 4
当我运行保存时,我最终会很快完成 3 个执行程序/任务,最后一个执行程序完成所有工作。当我扩展到一个非常大的(10 亿行 +)表时,同样的事情 - 200 个任务,其中 199 个在 < 100 毫秒内完成,最后一个完成所有工作。
我有几个问题:
- 如何调试每个分区/任务集是什么
- 如何查看每个任务的查询是什么(类似于 #1 )
我尝试将查询转换为 spark.sql 并创建临时视图等。那里的优化器做同样的事情。(有意义的是它共享相同的数据框/连接设置)
上游是 mssql jdbc 驱动程序 mssql-jdbc-6.2.2.jre8 , jdk 1.8.112 ( oracle )。pyspark 2.2.0.2.6.3.0-235 (hdp 2.6.3)