6

以下是假设性的。假设我有一个字符串相似度函数,它产生一个布尔结果 ( string_sim),另一个确定两个纬度/经度坐标之间的距离是否低于阈值 ( geo_dist)

我决定使用模糊连接合并这些条件:

merge(LHS, RHS, by=string_sim(LHS$string, RHS$string) & geo_dist(LHS$lat, LHS$lon, RHS$lat,RHS$lon))

在引擎盖下, data.table 需要将笛卡尔乘积进行比较……每一行都不对称。在数以万亿计的中等规模数据集中,这很容易成为一个巨大的数字。因此,在将数据发送到比较函数时,它可能还需要使用分而治之的策略,利用多个进程,每个进程的单元数低于 20 亿,以避免整数限制。本质上,它需要将向量的各个部分映射成碎片,然后将它们发送到函数(这听起来像 map-reduce)

假设用户明智并希望通过迭代应用连接命令来节省时间,从运行成本最低的开始,例如相等条件。

merge(LHS, RHS, by=list(LHS$first_initial == RHS$first_initial, string_sim(LHS$string, RHS$string) & geo_dist(LHS$lat, LHS$lon, RHS$lat,RHS$lon)))

我想要这样的功能。它花了我一些时间,但我已经使用 data.table 编写了一些代码,并且该软件包将来可能会附带类似的东西。

编辑:让我以更 data.table 原生的方式表达这一点。首先定义要匹配相等的变量:

setkey(LHS, first_initial)
setkey(RHS, first_initial)

然后通过矢量扫描进行二进制合并:

LHS[RHS][string_sim(string, string.1) & geo_dist(lat, lon, lat.1,lon.1)]

或者,可以先完成最昂贵的操作。我认为以下会更快:

LHS[RHS][geo_dist(lat, lon, lat.1,lon.1)][string_sim(string, string.1)]

但是,当 LHS 为 2000 万行,RHS 为 2000 万行时,这会使系统过载。为了防止这种情况,我需要使用分而治之的方法将 LHS 和 RHS 拆分为多个部分。如果有一些更有效的方法可以在后端并行化进程,我认为我不必做所有这些。

这是代码:

joiner <- function(x,y, reduce=quote( x[map$x][y[map$y],nomatch=0] ), chunks = 100000, mc.cores = getOption("cores")){
    require("multicore")
    require("data.table")
    map_function <- function(x_N,y_N, chunks){  
        x_partitions = ceiling(x_N/chunks)
        x_parts = (0:x_partitions)*chunks 
        x_parts[length(x_parts)]=x_N            
        y_partitions = ceiling(y_N/chunks)
        y_parts = (0:y_partitions)*chunks 
        y_parts[length(y_parts)]=y_N            
        MAP = vector("list",x_partitions*y_partitions )
        index = 0
        for(i in 1:x_partitions){
            for(j in 1:y_partitions){
                index = index +1
                MAP[[index]] = list(
                        x = (x_parts[i]+1):x_parts[i+1],
                        y = (y_parts[j]+1):y_parts[j+1]
                )
            }
        }
        return(MAP) 
    }

    if(missing(y)){
        y=x
    } 

    reducer_function <- function(map, reduce, x,y){
            eval(reduce)
    }

    collector = mclapply(map_function(nrow(x),nrow(y),chunks=chunks),reducer_function,reduce, x,y, mc.cores=mc.cores)   
    collector = rbindlist(collector)
    return(collector)   
}
D = data.table(value=letters, row_id = sample(1:100,26)); D= rbind(D,D); setkey(D,value)
joiner(D); joiner(D,chunks=10); D[D] # they are all the same, except the key is gone
4

0 回答 0