我正在尝试使用 deephashes (SSDEEP) http://code.google.com/p/pyssdeep处理超过 130 万个文件
它的作用是,。它生成哈希(在 3-6 分钟内生成 130 万),然后相互比较以获得相似性结果。比较非常快,但仅运行单个进程不会完成。所以我们放入 Python 多处理模块来完成任务。
结果是在 30 分钟内完成了 130 万个文本文件。使用 18 个内核(四核至强处理器,共 24 个 CPU)
以下是每个过程的工作原理:
- 生成 SSDEEP 和。
- 将这些总和列表拆分为 5000 组块。
- 在 18 个过程中比较每个块 1 与 5000:每次迭代比较 18 个总和。
- 根据相似度得分对结果进行分组(默认值为 75)
- 删除了已经检查过下一次迭代的文件。
- 从下一个文件开始,下一个组的分数 < 75%
- 重复直到所有组都完成。
- 如果有未包含的文件(与任何文件不相似),则将它们添加到剩余列表中。
当所有处理完成后,剩余的文件被合并并递归地相互比较,直到没有结果。
问题是,当文件列表被分块成更小的 (5000) 文件时。有些文件包含在前 5000 个块中,但未包含在另一个组中,使组不完整。
如果我在没有分块的情况下运行,则循环完成需要很长时间。超过 18 小时未完成,。不知道过了多久。
请给我建议。
使用的模块:multiprocessing.Pool,ssdeep python
def ssdpComparer(lst, threshold):
s = ssdeep()
check_file = []
result_data = []
lst1 = lst
set_lst = set(lst)
print '>>>START'
for tup1 in lst1:
if tup1 in check_file:
continue
for tup2 in set_lst:
score = s.compare(tup1[0], tup2[0])
if score >= threshold:
result_data.append((score, tup1[2], tup2[2])) #Score, GroupID, FileID
check_file.append(tup2)
set_lst = set_lst.difference(check_file)
print """####### DONE #######"""
remain_lst = set(lst).difference(check_file)
return (result_data, remain_lst)
def parallelProcessing(tochunk_list, total_processes, threshold, source_path, mode, REMAINING_LEN = 0):
result = []
remainining = []
pooled_lst = []
pair = []
chunks_toprocess = []
print 'Total Files:', len(tochunk_list)
if mode == MODE_INTENSIVE:
chunks_toprocess = groupWithBlockID(tochunk_list) #blockID chunks
elif mode == MODE_THOROUGH:
chunks_toprocess = groupSafeLimit(tochunk_list, TOTAL_PROCESSES) #Chunks by processes
elif mode == MODE_FAST:
chunks_toprocess = groupSafeLimit(tochunk_list) #5000 chunks
print 'No. of files group to process: %d' % (len(chunks_toprocess))
pool_obj = Pool(processes = total_processes, initializer = poolInitializer, initargs = [None, threshold, source_path, mode])
pooled_lst = pool_obj.map(matchingProcess, chunks_toprocess) #chunks_toprocess
tmp_rs, tmp_rm = getResultAndRemainingLists(pooled_lst)
result += tmp_rs
remainining += tmp_rm
print 'RESULT LEN: %s, REMAINING LEN: %s, P.R.L: %s' % (len(result), len(remainining), REMAINING_LEN)
tmp_r_len = len(remainining)
if tmp_r_len != REMAINING_LEN and len(result) > 0 :
result += parallelProcessing(remainining, total_processes, threshold, source_path, mode, tmp_r_len)
else:
result += [('','', rf[2]) for rf in remainining]
return result
def getResultAndRemainingLists(pooled_lst):
g_result = []
g_remaining = []
for tup_result in pooled_lst:
tmp_result, tmp_remaining = tup_result
g_result += tmp_result
if tmp_remaining:
g_remaining += tmp_remaining
return (g_result, g_remaining)