0

我一直在研究基于 pyspark 的内容拉/同步工具(类似于 sqoop,但应用了一些转换作为管道)。我正在通过以下方式创建 jdbc 连接:

datatype_boundry_mappings = {
    # https://docs.microsoft.com/en-us/sql/t-sql/data-types/int-bigint-smallint-and-tinyint-transact-sql
    # We are not not using negative numbers below 10 anywhere so our lower limit does not match the sqlserver range
    'sqlserver': {
        'tinyint': {
            'lower': 0,
            'upper': 255
        },
        'smallint': {
            'lower': -10,
            'upper': 32767
        },
        'int': {
            'lower': -10,
            'upper': 2147483647
        },
        'bigint': {
            'lower': -10,
            'upper': 9223372036854775807
        }
    }
}

(...)

    jdbc_df = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", table_name) \
        .option("user", db_username) \
        .option("password", db_password) \
        .option("numPartitions", partitions) \
        .option("partitionColumn", primary_key_column) \
        .option("lowerBound", datatype_boundry_mappings['sqlserver'][primary_key_type]['lower']) \
        .option("upperBound", datatype_boundry_mappings['sqlserver'][primary_key_type]['upper']) \
        .option("selectMethod", 'direct') \
        .load()

在 pyspark repl 中,我可以确认我有 4 个分区,但是我不知道如何判断每个分区的上/下份额是多少。

In [43]: delta_df.rdd.getNumPartitions()
Out[43]: 4

当我运行保存时,我最终会很快完成 3 个执行程序/任务,最后一个执行程序完成所有工作。当我扩展到一个非常大的(10 亿行 +)表时,同样的事情 - 200 个任务,其中 199 个在 < 100 毫秒内完成,最后一个完成所有工作。

我有几个问题:

  1. 如何调试每个分区/任务集是什么
  2. 如何查看每个任务的查询是什么(类似于 #1 )

我尝试将查询转换为 spark.sql 并创建临时视图等。那里的优化器做同样的事情。(有意义的是它共享相同的数据框/连接设置)

上游是 mssql jdbc 驱动程序 mssql-jdbc-6.2.2.jre8 , jdk 1.8.112 ( oracle )。pyspark 2.2.0.2.6.3.0-235 (hdp 2.6.3)

4

0 回答 0