问题标签 [pyspark-dataframes]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
pyspark - Pyspark:使用子字符串和频率向量创建字符串
我想从子字符串列表和相应的频率列表中创建一个字符串。例如我的df_in
样子如下:
我希望我df_out
看起来像这样:
由于数据集非常大(~22Mio 行),我想尽可能避免 for 循环。有什么优雅的方法可以实现这一目标吗?
非常感谢!
编辑:我目前的做法:
问题:
我读到为了加快计算速度,UDF 应该以 pyspark 方式重写。我不知道如何做到这一点。我还发现dtype
了df_in.frequency
is array<decimal(4.0)>
。所以我试图将这些值转换为int
first 或int
在运行时将它们转换为。
apache-spark - 保存 pyspark 数据框后列的可空性不正确
当保存一个 pyspark 数据框并添加了一个带有“withColumn”函数的新列时,可空性从 false 变为 true。
版本信息:Python 3.7.3/Spark2.4.0-cdh6.1.1
为什么在持久化数据帧时,newCol
添加函数的列的可为空标志会发生变化?withColumn
python - Add column with closest vaues to PySpark Dataframe
I have a PySpark dataframe (say df
) which has two columns ( Name
and Score
). Following is an example of the dataframe:
I have a numpy array (say bin_array
) which has values close to the numerical values that are there in the column titled Score
of the PySpark dataframe.
Following is the aforementioned numpy array:
I want to compare value from each row of the column Score
with values in bin_array
and store the closest value (gotten from bin_array
) in a separate column in the PySpark dataframe.
Below is how I would like my new dataframe (say df_new
) to look.
I have the below mentioned function which gives me the closest values from bin_array
. The function works fine when I test it with individual numbers.
In my actual work, I will have millions of rows in the datafrmae. What is the most efficient way to create df_new
?
Following are the steps that I tried to use to create user-defined function (udf) and the new data frame (df_new
).
But, I got errors when I tried df_new.show()
. A portion of the error is shown below.
You can use the below mentioned steps to create the aforementioned dataframe:
amazon-web-services - 为什么我们需要 distcp 命令将数据从 hdfs 复制到 s3,而我们可以直接将数据写入 s3 位置?
请帮助我了解 distcp 的使用,我们使用的是 s3,在一些脚本中我可以看到他们直接将数据写入 s3,并且很多情况下将数据写入 hdfs,然后使用 distcp 将数据复制到 s3。
那么什么时候使用distcp,什么时候可以直接写入云呢?
pyspark - 如何遍历数据框的数组值?
我有一个看起来像这样的 Pyspark 数据框
还有一个像这样的查找表/df
对于每一行,df1
我需要在数据框中查找每个数组元素lookup
并返回 true 或 false[T,T,T,T,F]
我怎样才能循环通过df1
?
apache-spark - 将 spark 数据框转换为没有 pandas 数据框的元组列表
我有一个将熊猫数据框转换为元组列表的现有逻辑。
其中 df 是熊猫数据框。
有人请帮我在 pyspark 中实现没有熊猫的相同逻辑。
python - 从 PySpark 中的类别分布中查找值的百分位数
我有以下 PySpark 数据框(比如df
)。它有name
、timestamp
和category
列value
。
我想在上述数据框中添加一个新列,它为我提供了分布中每个名称的值的百分位排名,其中包括相同category
和timestamp
.
我的预期输出如下:
做这个的最好方式是什么?
我尝试了以下方法:
这给出了正确的预期输出。但是,当我对拥有数百万行的实际数据进行尝试时,这种方法需要很长时间(数小时)。
df
您可以使用下面提到的代码生成上面()给出的数据框:
pyspark - 在 Pyspark 中具有条件的 Groupby
我的数据框看起来像
我想接第一个txn_date
之后reg_date
,即第txn_date
一个reg_date >= txn_date
。
预期产出
到目前为止,我已经做到了,
但是得到错误的结果。
python - 派斯帕克 | ModuleNotFoundError:没有名为“广告”的模块
使用 PySpark,我正在尝试向现有数据框中添加一个新列,其中新列中的条目表示最接近现有列的 bin 值。在我将在下面显示的示例中,numpy 数组bucket_array
表示箱(桶)。
PySpark 代码的相关部分,我将很快提到其错误,如下所示:
当我在 Jupyter notebook 中运行上述代码时,它运行良好,并且我能够看到数据框bucket_df
。
同样,当我将上述代码保存为单独的 python 函数时,将其导入我的 Jupyter 笔记本,然后最后执行它,我得到错误。我注意到错误发生在行bucket_df.show()
。该错误的一部分如下所示:
完整的错误可以在这里找到。
当我用 替换该行时bucket.show()
,print( bucket.count() )
我看不到任何错误并且它运行良好(即使我将上述代码用作单独的函数)。
entity_pct_metric_df
下面给出一个例子:
如何解决上述错误?
apache-spark - 从数据帧的每条记录创建 XML 请求
我尝试了很多选项,包括 withColumn、udf、lambda、foreach、map,但没有得到预期的输出。最大时,我只能转换第一条记录。inputfile.json 将继续增加,并且期望操作应该以所需的结构提供 xml。稍后我将在 Kafka 上制作预期的操作。火花 2.3,Python 2.7。需要在 PySpark 中做。
编辑1:
我可以在具有所需 xml 的主数据框中添加一列。我使用withColumn
并且functions.format_string
能够将字符串(xml 结构)添加到数据框的列中。
现在我的下一个目标是只为 Kafka 生成新列的值。我正在使用df.foreachPartition(send_to_kafka)
并创建了如下功能:
但不幸的是,它做了两件事
:在 Kafka 上生成记录为{'newColumn':u'myXMLPayload'}
. 我不要那个。我只想myXMLPayload
在 Kafka 上制作。
湾。它将 u' 添加到值以对值进行 unicoding。
我想摆脱这两个部分,我会很高兴。任何帮助,将不胜感激。