问题标签 [pyspark-dataframes]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
76 浏览

pyspark - Pyspark:使用子字符串和频率向量创建字符串

我想从子字符串列表和相应的频率列表中创建一个字符串。例如我的df_in样子如下:

我希望我df_out看起来像这样:

由于数据集非常大(~22Mio 行),我想尽可能避免 for 循环。有什么优雅的方法可以实现这一目标吗?

非常感谢!

编辑:我目前的做法:

问题:

我读到为了加快计算速度,UDF 应该以 pyspark 方式重写。我不知道如何做到这一点。我还发现dtypedf_in.frequencyis array<decimal(4.0)>。所以我试图将这些值转换为intfirst 或int在运行时将它们转换为。

0 投票
0 回答
147 浏览

apache-spark - 保存 pyspark 数据框后列的可空性不正确

当保存一个 pyspark 数据框并添加了一个带有“withColumn”函数的新列时,可空性从 false 变为 true。

版本信息:Python 3.7.3/Spark2.4.0-cdh6.1.1

为什么在持久化数据帧时,newCol添加函数的列的可为空标志会发生变化?withColumn

0 投票
2 回答
306 浏览

python - Add column with closest vaues to PySpark Dataframe

I have a PySpark dataframe (say df) which has two columns ( Name and Score). Following is an example of the dataframe:

I have a numpy array (say bin_array) which has values close to the numerical values that are there in the column titled Score of the PySpark dataframe.

Following is the aforementioned numpy array:

I want to compare value from each row of the column Score with values in bin_array and store the closest value (gotten from bin_array) in a separate column in the PySpark dataframe.

Below is how I would like my new dataframe (say df_new) to look.

I have the below mentioned function which gives me the closest values from bin_array. The function works fine when I test it with individual numbers.

In my actual work, I will have millions of rows in the datafrmae. What is the most efficient way to create df_new?

Following are the steps that I tried to use to create user-defined function (udf) and the new data frame (df_new).

But, I got errors when I tried df_new.show(). A portion of the error is shown below.

You can use the below mentioned steps to create the aforementioned dataframe:

0 投票
1 回答
170 浏览

amazon-web-services - 为什么我们需要 distcp 命令将数据从 hdfs 复制到 s3,而我们可以直接将数据写入 s3 位置?

请帮助我了解 distcp 的使用,我们使用的是 s3,在一些脚本中我可以看到他们直接将数据写入 s3,并且很多情况下将数据写入 hdfs,然后使用 distcp 将数据复制到 s3。

那么什么时候使用distcp,什么时候可以直接写入云呢?

0 投票
0 回答
260 浏览

pyspark - 如何遍历数据框的数组值?

我有一个看起来像这样的 Pyspark 数据框

还有一个像这样的查找表/df

对于每一行,df1我需要在数据框中查找每个数组元素lookup并返回 true 或 false[T,T,T,T,F]

我怎样才能循环通过df1

0 投票
2 回答
4680 浏览

apache-spark - 将 spark 数据框转换为没有 pandas 数据框的元组列表

我有一个将熊猫数据框转换为元组列表的现有逻辑。

其中 df 是熊猫数据框。

有人请帮我在 pyspark 中实现没有熊猫的相同逻辑。

0 投票
1 回答
241 浏览

python - 从 PySpark 中的类别分布中查找值的百分位数

我有以下 PySpark 数据框(比如df)。它有nametimestampcategoryvalue

我想在上述数据框中添加一个新列,它为我提供了分布中每个名称的值的百分位排名,其中包括相同categorytimestamp.

我的预期输出如下:

做这个的最好方式是什么?

我尝试了以下方法:

这给出了正确的预期输出。但是,当我对拥有数百万行的实际数据进行尝试时,这种方法需要很长时间(数小时)。

df您可以使用下面提到的代码生成上面()给出的数据框:

0 投票
2 回答
2846 浏览

pyspark - 在 Pyspark 中具有条件的 Groupby

我的数据框看起来像

我想接第一个txn_date之后reg_date,即第txn_date一个reg_date >= txn_date

预期产出

到目前为止,我已经做到了,

但是得到错误的结果。

0 投票
0 回答
397 浏览

python - 派斯帕克 | ModuleNotFoundError:没有名为“广告”的模块

使用 PySpark,我正在尝试向现有数据框中添加一个新列,其中新列中的条目表示最接近现有列的 bin 值。在我将在下面显示的示例中,numpy 数组bucket_array表示箱(桶)。

PySpark 代码的相关部分,我将很快提到其错误,如下所示:

当我在 Jupyter notebook 中运行上述代码时,它运行良好,并且我能够看到数据框bucket_df

同样,当我将上述代码保存为单独的 python 函数时,将其导入我的 Jupyter 笔记本,然后最后执行它,我得到错误。我注意到错误发生在行bucket_df.show()。该错误的一部分如下所示:

完整的错误可以在这里找到。

当我用 替换该行时bucket.show()print( bucket.count() )我看不到任何错误并且它运行良好(即使我将上述代码用作单独的函数)。

entity_pct_metric_df下面给出一个例子:

如何解决上述错误?

0 投票
0 回答
56 浏览

apache-spark - 从数据帧的每条记录创建 XML 请求

我尝试了很多选项,包括 withColumn、udf、lambda、foreach、map,但没有得到预期的输出。最大时,我只能转换第一条记录。inputfile.json 将继续增加,并且期望操作应该以所需的结构提供 xml。稍后我将在 Kafka 上制作预期的操作。火花 2.3,Python 2.7。需要在 PySpark 中做。

编辑1:

我可以在具有所需 xml 的主数据框中添加一列。我使用withColumn并且functions.format_string能够将字符串(xml 结构)添加到数据框的列中。

现在我的下一个目标是只为 Kafka 生成新列的值。我正在使用df.foreachPartition(send_to_kafka)并创建了如下功能:

但不幸的是,它做了两件事
:在 Kafka 上生成记录为{'newColumn':u'myXMLPayload'}. 我不要那个。我只想myXMLPayload在 Kafka 上制作。
湾。它将 u' 添加到值以对值进行 unicoding。

我想摆脱这两个部分,我会很高兴。任何帮助,将不胜感激。