-3

我有一个 csv 文件。

D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address
2,66M,J,Rock,F,1995,201211.0,J
3,David,HM,Lee,M,1991,201211.0,J
6,66M,,Rock,F,1990,201211.0,J
0,David,H M,Lee,M,1990,201211.0,B
3,Marc,H,Robert,M,2000,201211.0,C
6,Marc,M,Robert,M,1988,201211.0,C
6,Marc,MS,Robert,M,2000,201211.0,D

我想为居住在同一地址的具有相同姓氏的人分配相同的 ID 或索引。ID最好只由数字组成。
如果人在同一个地方有不同的姓氏,那么ID应该不同。这样的 ID 应该是唯一的。即,地址或姓氏不同的人,ID 必须不同。我的预期输出是

D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address,ID
2,66M,J,Rock,F,1995,201211.0,J,11
3,David,HM,Lee,M,1991,201211.0,J,12
6,66M,,Rock,F,1990,201211.0,J,11
0,David,H M,Lee,M,1990,201211.0,B,13
3,Marc,H,Robert,M,2000,201211.0,C,14
6,Marc,M,Robert,M,1988,201211.0,C,14
6,Marc,MS,Robert,M,2000,201211.0,D,15

我的数据文件大小约为 30 GB。我正在考虑groupBy根据由这些观察组成的键在火花中使用函数LNAME并将address这些观察组合在一起。然后按键为其分配一个ID。但我不知道该怎么做。之后,也许我可以用它flatMap来分割线并返回带有 ID 的观察结果。但我不确定。另外,我也可以在Linux环境下制作吗?谢谢你。

4

3 回答 3

2

由于您有 30GB 的输入数据,您可能不希望尝试将其全部保存在内存数据结构中的东西。让我们改用磁盘空间。

这是一种将所有数据加载到 sqlite 数据库中的方法,并为每个唯一的姓氏和地址对生成一个 id,然后将所有内容重新连接在一起:

#!/bin/sh

csv="$1"
# Use an on-disk database instead of in-memory because source data is 30gb.
# This will take a while to run.
db=$(mktemp -p .)

sqlite3 -batch -csv -header "${db}" <<EOF
.import "${csv}" people
CREATE TABLE ids(id INTEGER PRIMARY KEY, lname, address, UNIQUE(lname, address));
INSERT OR IGNORE INTO ids(lname, address) SELECT lname, address FROM people;
SELECT p.*, i.id AS ID
FROM people AS p
JOIN ids AS i ON (p.lname, p.address) = (i.lname, i.address)
ORDER BY p.rowid;
EOF

rm -f "${db}"

例子:

$./makeids.sh data.csv
D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address,ID
2,66M,J,Rock,F,1995,201211.0,J,1
3,David,HM,Lee,M,1991,201211.0,J,2
6,66M,"",Rock,F,1990,201211.0,J,1
0,David,"H M",Lee,M,1990,201211.0,B,3
3,Marc,H,Robert,M,2000,201211.0,C,4
6,Marc,M,Robert,M,1988,201211.0,C,4
6,Marc,MS,Robert,M,2000,201211.0,D,5

ID最好只由数字组成。

如果可以放宽该限制,您可以通过使用姓氏和地址的加密哈希作为 ID 一次性完成:

$ perl -MDigest::SHA=sha1_hex -F, -lane '
   BEGIN { $" = $, = "," } 
   if ($. == 1) { print @F, "ID" }
   else { print @F, sha1_hex("@F[3,7]") }' data.csv
D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address,ID
2,66M,J,Rock,F,1995,201211.0,J,5c99211a841bd2b4c9cdcf72d7e95e46b2ae08b5
3,David,HM,Lee,M,1991,201211.0,J,c263f9d1feb4dc789de17a8aab8f2808aea2876a
6,66M,,Rock,F,1990,201211.0,J,5c99211a841bd2b4c9cdcf72d7e95e46b2ae08b5
0,David,H M,Lee,M,1990,201211.0,B,e86e81ab2715a8202e41b92ad979ca3a67743421
3,Marc,H,Robert,M,2000,201211.0,C,363ed8175fdf441ed59ac19cea3c37b6ce9df152
6,Marc,M,Robert,M,1988,201211.0,C,363ed8175fdf441ed59ac19cea3c37b6ce9df152
6,Marc,MS,Robert,M,2000,201211.0,D,cf5135dc402efe16cd170191b03b690d58ea5189

或者,如果唯一 lname、地址对的数量足够小,可以合理地将它们存储在系统上的哈希表中:

#!/usr/bin/gawk -f
BEGIN {
    FS = OFS = ","
}
NR == 1 {
    print $0, "ID"
    next
}
! ($4, $8) in ids {
    ids[$4, $8] = ++counter
}
{
    print $0, ids[$4, $8]
}
于 2019-10-09T02:11:35.067 回答
2

正如评论中所讨论的,基本思想是对数据进行适当的分区,使具有相同LNAME+Address的记录保持在同一个分区中,运行 Python 代码在每个分区上生成单独的idx,然后将它们合并到最终的id中。

注意:我在您的示例记录中添加了一些新行,请参见下df_new.show()图所示的结果。

from pyspark.sql import Window, Row
from pyspark.sql.functions import coalesce, sum as fsum, col, max as fmax, lit, broadcast

# ...skip code to initialize the dataframe 

# tweak the number of repartitioning N based on actual data size
N = 5

# Python function to iterate through the sorted list of elements in the same 
# partition and assign an in-partition idx based on Address and LNAME.
def func(partition_id, it):
  idx, lname, address = (1, None, None)
  for row in sorted(it, key=lambda x: (x.LNAME, x.Address)):
    if lname and (row.LNAME != lname or row.Address != address): idx += 1
    yield Row(partition_id=partition_id, idx=idx, **row.asDict())
    lname = row.LNAME
    address = row.Address

# Repartition based on 'LNAME' and 'Address' and then run mapPartitionsWithIndex()
# function to create in-partition idx. Adjust N so that records in each partition
# should be small enough to be loaded into the executor memory:
df1 = df.repartition(N, 'LNAME', 'Address') \
        .rdd.mapPartitionsWithIndex(func) \
        .toDF()

获取唯一行数cnt(基于地址+LNAME),max_idx然后获取此 rcnt 的运行 SUM。

# idx: calculated in-partition id
# cnt: number of unique ids in the same partition: fmax('idx')
# rcnt: starting_id for a partition(something like a running count): coalesce(fsum('cnt').over(w1),lit(0))
# w1: WindowSpec to calculate the above rcnt
w1 = Window.partitionBy().orderBy('partition_id').rowsBetween(Window.unboundedPreceding,-1)

df2 = df1.groupby('partition_id') \
         .agg(fmax('idx').alias('cnt')) \
         .withColumn('rcnt', coalesce(fsum('cnt').over(w1),lit(0)))

df2.show()
+------------+---+----+
|partition_id|cnt|rcnt|
+------------+---+----+
|           0|  3|   0|
|           1|  1|   3|
|           2|  1|   4|
|           4|  1|   5|
+------------+---+----+

将 df1 与 df2 连接并创建最终 id,即idx + rcnt

df_new = df1.join(broadcast(df2), on=['partition_id']).withColumn('id', col('idx')+col('rcnt'))

df_new.show()
#+------------+-------+---+----+-----+------+------+-----+---+--------+---+----+---+
#|partition_id|Address|  D| DOB|FNAME|GENDER| LNAME|MNAME|idx|snapshot|cnt|rcnt| id|
#+------------+-------+---+----+-----+------+------+-----+---+--------+---+----+---+
#|           0|      B|  0|1990|David|     M|   Lee|  H M|  1|201211.0|  3|   0|  1|
#|           0|      J|  3|1991|David|     M|   Lee|   HM|  2|201211.0|  3|   0|  2|
#|           0|      D|  6|2000| Marc|     M|Robert|   MS|  3|201211.0|  3|   0|  3|
#|           1|      C|  3|2000| Marc|     M|Robert|    H|  1|201211.0|  1|   3|  4|
#|           1|      C|  6|1988| Marc|     M|Robert|    M|  1|201211.0|  1|   3|  4|
#|           2|      J|  6|1991|  66M|     F|   Rek| null|  1|201211.0|  1|   4|  5|
#|           2|      J|  6|1992|  66M|     F|   Rek| null|  1|201211.0|  1|   4|  5|
#|           4|      J|  2|1995|  66M|     F|  Rock|    J|  1|201211.0|  1|   5|  6|
#|           4|      J|  6|1990|  66M|     F|  Rock| null|  1|201211.0|  1|   5|  6|
#|           4|      J|  6|1990|  66M|     F|  Rock| null|  1|201211.0|  1|   5|  6|
#+------------+-------+---+----+-----+------+------+-----+---+--------+---+----+---+

df_new = df_new.drop('partition_id', 'idx', 'rcnt', 'cnt')

一些注意事项:

  • 实际上,在将LNAMEAddress列用作唯一性检查之前,您需要清除/规范化它们。例如,使用uniq_keyLNAME地址组合在一起的单独列作为数据框的唯一键。请参阅下面的一些基本数据清理过程的示例:

    from pyspark.sql.functions import coalesce, lit, concat_ws, upper, regexp_replace, trim
    
    #(1) convert NULL to '': coalesce(col, '')
    #(2) concatenate LNAME and Address using NULL char '\x00' or '\0'
    #(3) convert to uppercase: upper(text)
    #(4) remove all non-[word/whitespace/NULL_char]: regexp_replace(text, r'[^\x00\w\s]', '')
    #(5) convert consecutive whitespaces to a SPACE: regexp_replace(text, r'\s+', ' ')
    #(6) trim leading/trailing spaces: trim(text)
    df = (df.withColumn('uniq_key',
        trim(
          regexp_replace(
            regexp_replace(
              upper(
                concat_ws('\0', coalesce('LNAME', lit('')), coalesce('Address', lit('')))
              ),
              r'[^\x00\s\w]+',
              ''
            ),
            r'\s+',
            ' '
          )
        )
    ))
    

    然后在代码中,替换和'LNAME'用来找到idx'Address'uniq_key

  • 正如cronoik在评论中提到的,您还可以尝试使用 Window rank 函数之一来计算分区内的 idx。例如:

    from pyspark.sql.functions import spark_partition_id, dense_rank
    
    # use dense_rank to calculate the in-partition idx
    w2 = Window.partitionBy('partition_id').orderBy('LNAME', 'Address')
    df1 = df.repartition(N, 'LNAME', 'Address') \
            .withColumn('partition_id', spark_partition_id()) \
            .withColumn('idx', dense_rank().over(w2))
    

    有了 df1 后,使用与上述相同的方法计算 df2 和 df_new。这应该比使用mapPartitionsWithIndex()基本上是基于 RDD 的方法更快。

  • 对于您的真实数据,调整N以适合您的实际数据大小。这N只会影响初始分区,在数据帧加入后,分区将重置为默认值(200)。spark.sql.shuffle.partitions例如,您可以在初始化 spark 会话时调整它:

    spark = SparkSession.builder \
                        ....
                        .config("spark.sql.shuffle.partitions", 500) \
                        .getOrCreate()
    
于 2019-10-27T02:29:56.910 回答
0
$ sort -t, -k8,8 -k4,4 <<EOD | awk -F, '  $8","$4 != last { ++id; last = $8","$4 }
                                          { NR!=1 && $9=id; print }' id=9 OFS=,
D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address
2,66M,J,Rock,F,1995,201211.0,J
3,David,HM,Lee,M,1991,201211.0,J
6,66M,,Rock,F,1990,201211.0,J
0,David,H M,Lee,M,1990,201211.0,B
3,Marc,H,Robert,M,2000,201211.0,C
6,Marc,M,Robert,M,1988,201211.0,C
6,Marc,MS,Robert,M,2000,201211.0,D
> EOD
D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address
0,David,H M,Lee,M,1990,201211.0,B,11
3,Marc,H,Robert,M,2000,201211.0,C,12
6,Marc,M,Robert,M,1988,201211.0,C,12
6,Marc,MS,Robert,M,2000,201211.0,D,13
3,David,HM,Lee,M,1991,201211.0,J,14
2,66M,J,Rock,F,1995,201211.0,J,15
6,66M,,Rock,F,1990,201211.0,J,15
$
于 2019-10-09T03:38:48.397 回答