问题标签 [spark-jdbc]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1817 浏览

pyspark - 使用 spark JDBC 从 Oracle 表加载数据非常慢

我正在尝试使用 spark jdbc 从表中读取 5 亿条记录,然后对该表进行性能连接。当我从 sql developer 执行 sql 时,需要 25 分钟。但是,当我使用 spark JDBC 加载它时,上次它运行了 18 个小时,然后我取消了它。我为此使用 AWS-GLUE。

这就是我使用 spark jdbc 阅读的方式

我使用了 partitionColumn OUTSTANDING_ACTIONS,这是数据分布列 1 是 partitionColumn,第二列是它们的出现

这是我的加入,其中 customer_caseOnpremView 表加载需要超过 18 小时,其他两个表需要 1 分钟

请建议如何使其快速。我没有从 10 到 40 的工人 我使用 Executor 类型标准到 GP2 最大的标准,但对工作没有影响

0 投票
2 回答
1192 浏览

apache-spark - 编写数据帧时从 Spark 到数据库的连接有多少?

我很困惑在以下场景中 Spark 会与数据库建立多少连接:

假设我有一个 Spark 程序,它只在一个带有一个执行程序的工作节点上运行,并且数据帧中的分区数为 10。我想将此数据帧写入 Teradata。既然并行度是10,而executor只有1,那么保存数据的时候是10个连接,还是只有1个连接?

0 投票
1 回答
292 浏览

postgresql - 使用 spark 从 Postgres 并行读取数据,用于没有整数主键列的表

我正在努力从 PostGres 表中读取数据,该表包含特定季度的 1.02 亿条记录。该表包含多个季度的数据。现在我正在通过 spark JDBC 连接器读取数据,获取数据需要花费太多时间。当我对数据框执行操作(如 Count())时,加载数据几乎需要 15-20 分钟。数据正在加载或在单个任务上处理,所以我想并行处理/读取数据。

我正在使用下面的代码来获取数据并创建连接:

我知道如果您指定分区列并指定下限和上限并且分区列需要是整数,我们可以并行读取数据,但在我的情况下,我没有任何整数类型的列。主键也是 GUID 类型。

我可以更快地读取数据或读取并行任务数据的任何方式对我都有帮助。关于我是否可以使用任何具有该功能的第三方或我可以使用本机 JDBC 连接器的任何方式的任何建议。

0 投票
1 回答
70 浏览

apache-spark - 我希望 Spark 在保存到数据库时忽略不良记录

我正在使用 spark JDBC 将行保存在数据库中。数据的保存工作正常。

问题:如果遇到任何错误记录,Spark 会中止保存(例如,当表期望非空值时,一列具有空值)

我想要什么:我希望 Spark 忽略坏行并继续保存下一行。这怎么可能实现?我在文档中看不到太多。使用StructType不是一种选择。

任何指针?

我的代码看起来像这样。

0 投票
1 回答
308 浏览

postgresql - 加速 spark df.write 到 PostgreSQL 的最佳参数

我正在尝试将约 300 万行 x 158 列(约 3GB)的 Pyspark 数据帧写入 TimeScale DB。

写入操作是从具有以下资源的 Jupyter 内核执行的:

  • 1 个驱动程序,2 个 vcpu,2GB 内存
  • 2 个执行器,2 个 vcpu,4GB 内存

正如人们所预料的那样,它相当慢。

我知道repartitionand batchsize,所以我试图使用这些参数来加快写入操作,但我想知道什么是尽可能高性能的最佳参数。

df.rdd.getNumPartitions()是7,我应该尝试增加还是减少分区数?我试着玩了一下,但没有得到任何结论性的结果。增加分区数量似乎确实会减慢写入速度,但这可能只是因为 Sparkrepartition先执行。

我更具体地想知道batchsize. 我猜最佳批量大小取决于 TimeScale/Postgre 配置,但我无法找到有关此的更多信息。

作为记录,这是我尝试过的一个示例:

这在数据帧的小得多的样本(约 500K 行,500MB)上花费了 26 分钟。

我们知道我们的 Jupyter 内核缺乏资源,并且也在努力解决这个问题,但是有没有办法使用 Spark 和 TimeScale 参数优化写入速度?

[编辑] 我也阅读了这个关于使用 COPY 的非常有用的答案,但我们现在正在专门寻找使用 Spark 提高性能的方法。

0 投票
1 回答
338 浏览

apache-spark-sql - Spark JDBC 写入 Teradata:由于死锁错误导致多个 Spark 任务失败,Transaction ABORTed 导致 Stage 失败

我正在使用 spark JDBC write 将数据从配置单元加载到 teradata 视图。我正在使用 200 个 vcore 并将数据划分为 10000 个分区。

Spark 任务失败并出现以下错误,导致阶段失败。有时应用程序成功完成但有一些重复记录

由:java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.10] [Error 2631] [SQLState 40001] Transaction ABORTed 由于死锁。

以下是我使用的代码:

val df = spark.sql("select * from hive table").distinct.repartition(10000).write.mode(overwrite) .option("truncate", Truncate).j​​dbc(url,dbTable, dproperties)

Teradata 视图是使用“AS LOCKING ROW FOR ACCESS”创建的。该表还有一个唯一的 PI。

我无法弄清楚为什么某些 spark 任务因死锁错误而失败,有没有办法可以阻止我的整个 spark 应用程序因任务失败而失败。

0 投票
0 回答
143 浏览

postgresql - 并行读取数据时,Spark 不会将过滤器下推到 PostgreSQL 数据源,提供下限和上限的值

我正在尝试从 PostgreSQL 表中并行读取数据。我使用时间戳列作为分区列,并提供下限、上限和 numPartitions 的值。它正在创建多个查询以并行读取数据,但并未将过滤器下推到 PostgreSQL 数据库。当我在数据框上使用解释命令时,物理计划中的推送过滤器中没有任何内容。我也尝试在加载方法之后应用过滤器子句,但它仍然没有按下过滤器。

选项1:这里我没有使用过滤条件

解释计划输出

现在,如果我在 df 上进行解释,则推送的过滤器中没有任何内容,但是我能够使用 pg_stat_activity 从 PostgreSQL 获取的查询显示了 12 个具有 where 条件的不同查询。我在这里提供一个查询。

我在这里有点困惑,无论是过滤 PostgreSQL 中的记录还是在 spark 中根据解释计划进行过滤,您在推送的过滤器中没有任何内容,但根据生成的查询,它看起来像是在过滤PostgreSQL 中的数据。

选项 2:使用过滤条件

解释上述数据框的计划

使用 pg_stat_activity 来自 PostgreSQL 的查询之一

我想了解的是,为什么在提供分区列和下限和上限时,它没有将过滤器推送到数据库,而是在通过将值转换为时间戳来应用显式过滤器之后,它会将过滤器向下推。框架也不应该足够聪明,将我们传递的值视为下限和上限,以将其视为时间戳列的范围。

如果您有大量数据需要在过滤条件之后读取,那么最有效的处理方法是什么?

0 投票
1 回答
414 浏览

apache-spark - PySpark pyspark.sql.DataFrameReader.jdbc() 不接受日期时间类型上限参数,如文档所述

PySpark 3.0.1我在 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader的 jdbc 函数文档中找到,它说:

column – 将用于分区的数字、日期或时间戳类型的列的名称。

我认为它接受一个日期时间列来对查询进行分区。

所以我在EMR-6.2.0(PySpark 3.0.1)上尝试了这个:

我收到了这个错误:

我在这里查看了源代码 https://github.com/apache/spark/blob/master/python/pyspark/sql/readwriter.py#L865 发现它不支持文档所说的日期时间类型。

我的问题是:

如代码所示,它不支持 PySpark 中的 datetime 类型分区列,但为什么文档说它支持呢?

谢谢,

0 投票
0 回答
185 浏览

apache-spark - 如何使用在 GCP 上运行而不使用安全套接字层的 Pyspark 连接到 SQL Server?

我正在尝试使用 PySpark 连接到 SQL Server 数据库,如下所示:

我正在从 Google Cloud Platform 运行此代码。我有一个 Dataproc 实例,我在其中为此操作创建了一个集群并在那里提交我的作业。作业失败,但有以下异常:

py4j.protocol.Py4JJavaError: 调用 o70.load 时出错。
: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法使用安全套接字层 (SSL) 加密建立与 SQL Server 的安全连接。错误:“连接重置 ClientConnectionId:1223412f-9879702-wfwd-134qq-2143d123e1q”。

在 com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3208)
在 com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1916)
在 com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper (SQLServerConnection.java:2760)
在 com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:2418) 在 com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:2265) 在 com.microsoft.sqlserver.jdbc.SQLServerConnection.connect (SQLServerConnection.java:1291) at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:881) at java.lang.Thread.run(Thread.java:748) 原因:java.io.IOException:连接重置ClientConnectionId:1223412f-9879702-wfwd-134qq-2143d123e1q。在 com.microsoft.sqlserver.jdbc.TDSChannel$SSLHandshakeInputStream.readInternal(IOBuffer.java:862) 在 com.microsoft.sqlserver.jdbc.TDSChannel$SSLHandshakeInputStream.read(IOBuffer.java:849) 在 com.microsoft.sqlserver.jdbc .TDSChannel$ProxyInputStream.readInternal(IOBuffer.java:1019) 在 com.microsoft.sqlserver.jdbc.TDSChannel$ProxyInputStream。

回溯(最近一次通话最后):

文件“/tmp/portw/pattern.py”,第 24 行,在
connect_and_read(spark)

文件“/tmp/portw/pattern.py”,第 18 行,在 connect_and_read
traceback.print_exc(type(ex), ex, ex. traceback )

文件“/opt/conda/default/lib/python3.8/traceback.py”,第 163 行,在 print_exc
print_exception(*sys.exc_info(), limit=limit, file=file, chain=chain)

文件“/opt/conda/default/lib/python3.8/traceback.py”,第 103
行,在 TracebackException 中的 print_exception 行(

文件“/opt/conda/default/lib/python3.8/traceback.py”,第 509 行,在init
self.stack = StackSummary.extract(


如果限制> = 0 ,则提取文件“/opt/conda/default/lib/python3.8/traceback.py”,第 340 行:

TypeError: 'type' 和 'int' 的实例之间不支持 '>='

21/02/09 13:59:59 信息 org.sparkproject.jetty.server.AbstractConnector: 停止 Spark@1aa73a6d{HTTP/1.1, (http/1.1)}

早些时候,该 URL 不包含encrypt=false,我在查看了一些参考资料后添加了它。

我可以用纯 Python 代码连接到我的 API 的同一主机,但不能在 Spark 中连接。

谁能让我知道我在这里犯了什么错误以及如何纠正它。

任何帮助表示赞赏。

0 投票
2 回答
4362 浏览

python - 为什么 PostgreSQL 说 FATAL:抱歉,当我离最大连接数还很远时,客户端已经太多了?

我正在安装 PostgreSQL 11.2,它会定期在其系统日志中抱怨

尽管远未接近其配置的连接限制。这个查询:

告诉我数据库配置为最多 100 个连接。我从未见过超过 45 个使用此查询连接到数据库的连接,甚至在正在运行的程序收到数据库错误之前的片刻,即在 Postgres 日志中上述消息支持的客户端过多。

绝对我可以在 Internet 上找到的所有问题都表明该错误意味着您已超出max_connections设置,但数据库本身告诉我我没有。

值得一提的是,pyspark 是唯一触发此错误的数据库客户端,并且仅当它从数据帧写入表时。使用psycopg2(即主客户端)的常规 python 代码永远不会触发它(即使从 Pandas 数据帧以相同的方式写入表时也不会),并且像 pgAdmin 这样的管理工具也永远不会触发它。如果我没有直接在数据库日志中看到错误,我会认为 Spark 在对我撒谎。大多数时候,如果我使用这样的查询:

然后问题消失了几天。但就像我说的那样,根据数据库本身,我从来没有见过假设的最大 100 个连接的 50% 正在使用中。如何找出导致此错误的原因?