0

我正在 pyspark 中创建一个循环,并且我收到以下消息:
"Column is not iterable" 

这是代码:

(regexp_replace(data_join_result[varibale_choisie],
(random.choice(data_join_result.collect()[j][varibale_choisie])),
data_join_result.collect()[j][lettre_choisie] )))) 

在错误信息中,问题就在此时出现:

data_join_result.collect()[j][lettre_choisie]

我的输入:
VARIABLEA | 变量B
蓝色 | 白粉
| DARK

我的预期输出:
VARIABLEA | 变量
B BLTE | 白粉
| 达姆

如果有人知道如何解决它!谢谢

4

2 回答 2

0


>最后,我找到了如何创建一个**循环来破坏数据集**。如果有人需要一天,我会分享!

首先,您需要定义要创建的错误、要用于替换的字母,例如要损坏的变量,然后添加带有特殊字符的错误:

lettre = [ "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"]

code_erreur= [ "replace","inserte","delete","espace","caract_spe", "NA","inverse"]

nombre_erreur=["1","1","1","2"]

varibale =["VARIABLEA","VARIABLEB"]

caract_spe =["_", "^", "¨", "", ".", "é", "-", "*","ù","ï","à","è","î","â"]
  • 我创建了一个列表“nombre_erreur”,因为我想要 75% 的数据集有 1 个错误,25% 有 2 个错误。

接下来,创建定义:

def def_code_erreur(code_erreur,varibale ,nombre_erreur,lettre,caract_spe):

  if type_erreur=="delete":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos] + col1[(pos+1):]
      
  if type_erreur=="espace":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos] + " " + col1[(pos):]
      
  if type_erreur=="inserte":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos] + lettre_choisie + col1[(pos):] 
      
  if type_erreur=="caract_spe":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos] + caract_spe_choisi + col1[(pos):]
      
  if type_erreur=="replace":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos-1] + lettre_choisie + col1[(pos):]      
      
  if type_erreur=="inverse":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos-1] + col1[pos:pos+1] + col1[pos-1:pos] + col1[(pos+1):]      
      
  if type_erreur=="NA":
    for i in range(0,int(nb_erreur)):
      col1 = col1

    
  return col1


udf_def_code_erreur = udf(def_code_erreur, StringType())

好吧,您必须调用“udf_def_code_erreur”!如果要破坏整个数据集,可以循环调用它。

于 2020-07-29T09:40:22.527 回答
0

不建议在驱动程序中收集数据,还要遍历数据框。Spark 提供了多个 api,允许我们以并行方式执行我们的任务。在您的情况下,您可以尝试以下方法:

对于单个字符替换,请尝试此(性能密集型)选项

import pyspark.sql.functions as F
import string
import random
test1 = spark.createDataFrame([("Mike","apple", "oranges", "red wine"),("Kate","Whitewine", "green beans", "waterrr"), ("Leah", "red wine","juice","rice")],schema=["col1","col2","col3","col4"])
cols = test1.columns
alp=(list(string.ascii_lowercase))  

@F.udf(test1.schema)
    def randomize(row):
        row_d = row.asDict()
        pos_sel = random.randint(0,len(cols)-1)    
        col_select = cols[pos_sel]
        row_d[col_select]=row_d[col_select].replace(alp[random.randint(1,24)],alp[random.randint(1,24)],1)
        return(row_d)
    
    test2 = test1.withColumn("struct_coln",randomize(F.struct(cols))).select('struct_coln.*')

结果:

+----+---------+-----------+--------+
|col1|col2     |col3       |col4    |
+----+---------+-----------+--------+
|Mike|apple    |orangos    |red wine|
|Kate|Whitewine|green beans|waterrr |
|Leah|red wine |juice      |rice    |
+----+---------+-----------+--------+

你可以看到橙子被损坏为橙子。如果您将要替换的字母限制为仅元音,则腐败的可能性会增加。

如果您不需要替换一个字符,请尝试以下操作:

test1 = spark.createDataFrame([("Mike","apple", "oranges", "red wine"),("Kate","Whitewine", "green beans", "waterrr"), ("Leah", "red wine","juice","rice")],schema=["col1","col2","col3","col4"])
cols = test1.columns
alp=(list(string.ascii_lowercase))


#%%"
for i in range(30):    
    pos_sel = random.randint(0,len(cols)-1)    
    col_select = cols[pos_sel]
    tst_rep = test1.withColumn(col_select,F.translate(F.col(col_select),alp[random.randint(1,24)],alp[random.randint(1,24)]))
    test1 = tst_rep

在这里,您可以通过控制循环迭代来进行一些控制

结果:

test1.show()
+----+---------+-----------+--------+
|col1|     col2|       col3|    col4|
+----+---------+-----------+--------+
|Mike|    applu|    oranges|rjd winj|
|Kate|Whifuwinu|green beans| watjrrr|
|Leah| rud winu|      juihe|    ricj|
+----+---------+-----------+--------+
于 2020-07-09T16:25:50.707 回答