问题标签 [pyspark-sql]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 使用 PySpark 迭代 SQL.Row 列表
我有一个看起来像这样的 Spark.SQL.Row:
我想使用以下方法从每个嵌套行中获取值:
问题是当我迭代时,整行都被转换成元组,
我失去了架构。有没有办法迭代和保留行列表的模式?
apache-spark - Spark SQL 窗口函数/lag() 给出了意想不到的结果
编辑:经过更多的故障排除后,我发现了以下内容 - 我将 lag(event_time) 添加到查询中以查看查询收集的日期并且得到同样奇怪的结果:
对于某些 device_ids,这会按预期返回,但是某些 device_ids 会返回以下内容:
看起来当 device_id 的前两个事件具有相同的时间戳时会发生这种情况。希望这能给某人一个线索?
OP
我正在尝试使用 Spark SQL 中的 lag() 函数来确定表中两个后续事件之间的时间长度。重要的列是 device_id、文本列、unix_time、数字时间戳和 event_id,每行都是唯一的。
我正在运行的查询:
在 Postgres 中,这给出了预期的结果 - 但是当在 Pyspark 中运行时,任何时候有两个事件具有相同的时间戳, seconds_since_last_event 被计算为一个很大的数字,即 -1435151676846888 或 -1431583545415023 或 25534 - 我不知道在哪里这些数字来自。
我尝试在查询中添加一个 if() 语句,如
但我得到了相同的结果。有什么想法可能导致这种情况吗?
python - RDD映射创建层次树
如何构造一个 table/rdd 以便它描述来自 parent 的级别。下面给出的是“:”之后的节点及其对应的子节点
对于上表,如果我的源输入为 2,目标输入为 5,iIconstruct 如何构造一个 rdd 使其看起来像
其中:旁边的数字表示父节点,下一个数字表示根节点的级别。
mysql - 如何从 sql 中的 `describe` 语句创建一个新表?
我想创建一个包含列名行的新表。本质上,我想将decribe
语句的输出放入一个新表中,以便我可以使用where
子句仅提取某些列名。我正在使用sparksql。
我该怎么做呢?谢谢。
python - Pyspark数据框:对一列求和,同时对另一列进行分组
我有一个如下的数据框
我想做的是为第一列的每个不同值计算第二列对应值的总和。我尝试使用以下代码执行此操作:
这给出了一个输出
我不太确定它是否做对了。为什么它不显示第一列的信息?提前感谢您的回答
hadoop - 如何从 pyspark rdd 或分区中确定原始 s3 输入文件名
我正在使用 pyspark 流式传输到来自 S3 的 ETL 输入文件。
我需要能够在 s3:// 上构建所有原始输入文件的审计跟踪,并且我的 parquet 输出最终在 hdfs:// 上。
给定一个 dstream、rdd 甚至特定的 rdd 分区,是否可以确定 s3 中输入数据的原始文件名?
目前我知道这样做的唯一方法是采用
rdd.toDebugString()
并尝试解析它。然而,这感觉真的很hacky并且在某些情况下不起作用。例如,解析调试输出不适用于我也在做的批处理模式导入(使用sc.TextFile("s3://...foo/*")
样式 glob)。
有没有人有确定原始文件名的理智方法?
似乎其他一些 spark 用户过去也有过这个问题,例如:
谢谢!
apache-spark - 如何在 pyspark 中加载 gzip 压缩的 csv 文件?
文件名不以结尾,.gz
我无法将它们改回来,因为它们与其他程序共享。
file1.log.gz.processed
只是一个csv
文件。但我如何阅读它pyspark
,最好是pyspark.sql
?
我试图指定格式和压缩,但找不到正确的键/值。例如,
sqlContext.load(fn, format='gz')
没用。虽然 Spark 可以处理gz
文件,但它似乎是根据文件名确定编解码器。例如,
sc.textFile(fn)
如果文件以.gz
但不是我的情况结束,则可以使用。
如何指示 Spark 使用正确的编解码器?谢谢!
apache-spark - spark SQL GROUP BY 聚合引发错误“列表索引超出范围”
我在 spark 1.5.0 中有一个包含两列的数据框。以下查询正常工作:
sqlContext.sql("select id, value from table").show()
但是做聚合失败了:
sqlContext.sql("select id, count(value) from table group by id").show()
返回错误:
WARN TaskSetManager: Lost task 13.0 in stage 10.0: Traceback
...
IndexError: list Index out of range
我count
故意使用确实排除了错误类型的可能性。所有列都被视为字符串(尽管有些是数字)。我叫错了吗?
python - 如何在保留现有架构的同时从行中创建 DataFrame?
如果我调用 map 或mapPartition
并且我的函数从 PySpark 接收行,那么创建本地 PySpark 或 Pandas DataFrame 的自然方法是什么?结合行并保留模式的东西?
目前我做类似的事情: