问题标签 [pyspark-sql]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
3562 浏览

python - 使用 PySpark 迭代 SQL.Row 列表

我有一个看起来像这样的 Spark.SQL.Row:

我想使用以下方法从每个嵌套行中获取值:

问题是当我迭代时,整行都被转换成元组,

我失去了架构。有没有办法迭代和保留行列表的模式?

0 投票
1 回答
1332 浏览

apache-spark - Spark SQL 窗口函数/lag() 给出了意想不到的结果

编辑:经过更多的故障排除后,我发现了以下内容 - 我将 lag(event_time) 添加到查询中以查看查询收集的日期并且得到同样奇怪的结果:

对于某些 device_ids,这会按预期返回,但是某些 device_ids 会返回以下内容:

看起来当 device_id 的前两个事件具有相同的时间戳时会发生这种情况。希望这能给某人一个线索?


OP

我正在尝试使用 Spark SQL 中的 lag() 函数来确定表中两个后续事件之间的时间长度。重要的列是 device_id、文本列、unix_time、数字时间戳和 event_id,每行都是唯一的。

我正在运行的查询:

在 Postgres 中,这给出了预期的结果 - 但是当在 Pyspark 中运行时,任何时候有两个事件具有相同的时间戳, seconds_since_last_event 被计算为一个很大的数字,即 -1435151676846888 或 -1431583545415023 或 25534 - 我不知道在哪里这些数字来自。

我尝试在查询中添加一个 if() 语句,如

但我得到了相同的结果。有什么想法可能导致这种情况吗?

0 投票
0 回答
396 浏览

python - RDD映射创建层次树

如何构造一个 table/rdd 以便它描述来自 parent 的级别。下面给出的是“:”之后的节点及其对应的子节点

对于上表,如果我的源输入为 2,目标输入为 5,iIconstruct 如何构造一个 rdd 使其看起来像

其中:旁边的数字表示父节点,下一个数字表示根节点的级别。

0 投票
1 回答
329 浏览

mysql - 如何从 sql 中的 `describe` 语句创建一个新表?

我想创建一个包含列名行的新表。本质上,我想将decribe语句的输出放入一个新表中,以便我可以使用where子句仅提取某些列名。我正在使用sparksql。

我该怎么做呢?谢谢。

0 投票
1 回答
1726 浏览

python - PySpark jdbc 谓词错误:Py4JError: An error occurred while calling o108.jdbc

我正在尝试在我的DataFrameReader.jdbc()方法中使用谓词:

但是,我遇到以下错误:

我应该如何向 jdbc 方法调用添加谓词?

0 投票
3 回答
46409 浏览

python - Pyspark数据框:对一列求和,同时对另一列进行分组

我有一个如下的数据框

我想做的是为第一列的每个不同值计算第二列对应值的总和。我尝试使用以下代码执行此操作:

这给出了一个输出

我不太确定它是否做对了。为什么它不显示第一列的信息?提前感谢您的回答

0 投票
1 回答
653 浏览

hadoop - 如何从 pyspark rdd 或分区中确定原始 s3 输入文件名

我正在使用 pyspark 流式传输到来自 S3 的 ETL 输入文件。

我需要能够在 s3:// 上构建所有原始输入文件的审计跟踪,并且我的 parquet 输出最终在 hdfs:// 上。

给定一个 dstream、rdd 甚至特定的 rdd 分区,是否可以确定 s3 中输入数据的原始文件名?

目前我知道这样做的唯一方法是采用 rdd.toDebugString()并尝试解析它。然而,这感觉真的很hacky并且在某些情况下不起作用。例如,解析调试输出不适用于我也在做的批处理模式导入(使用sc.TextFile("s3://...foo/*")样式 glob)。

有没有人有确定原始文件名的理智方法?

似乎其他一些 spark 用户过去也有过这个问题,例如:

http://apache-spark-user-list.1001560.n3.nabble.com/Access-original-filename-in-a-map-function-tt2831.html

谢谢!

0 投票
1 回答
9662 浏览

apache-spark - 如何在 pyspark 中加载 gzip 压缩的 csv 文件?

文件名不以结尾,.gz我无法将它们改回来,因为它们与其他程序共享。

file1.log.gz.processed只是一个csv文件。但我如何阅读它pyspark,最好是pyspark.sql

我试图指定格式和压缩,但找不到正确的键/值。例如,

sqlContext.load(fn, format='gz')

没用。虽然 Spark 可以处理gz文件,但它似乎是根据文件名确定编解码器。例如,

sc.textFile(fn)

如果文件以.gz但不是我的情况结束,则可以使用。

如何指示 Spark 使用正确的编解码器?谢谢!

0 投票
1 回答
896 浏览

apache-spark - spark SQL GROUP BY 聚合引发错误“列表索引超出范围”

我在 spark 1.5.0 中有一个包含两列的数据框。以下查询正常工作: sqlContext.sql("select id, value from table").show()

但是做聚合失败了:

sqlContext.sql("select id, count(value) from table group by id").show()

返回错误:

WARN TaskSetManager: Lost task 13.0 in stage 10.0: Traceback...

IndexError: list Index out of range

count故意使用确实排除了错误类型的可能性。所有列都被视为字符串(尽管有些是数字)。我叫错了吗?

0 投票
4 回答
4071 浏览

python - 如何在保留现有架构的同时从行中创建 DataFrame?

如果我调用 map 或mapPartition并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?

目前我做类似的事情: