谢谢大家的回复。
我已经为提议的问题创建了一个解决方案(不是“解决方案”),并且由于其他人可能会发现它很有用,因此我在此处发布代码。我的解决方案是 Adam Matan 建议的选项 1 和 3 的混合体。该代码包含来自我的 vi 会话的行号,这将有助于下面的讨论。
12 # System libraries needed by this module.
13 import numpy, multiprocessing, time
14
15 # Third-party libraries needed by this module.
16 import labeledMatrix
17
18 # ----- Begin code for this module. -----
19 from commonFunctions import debugMessage
20
21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None,
22 exceptionType=ValueError, useNumType=numpy.float, verbose=False,
23 maxProcesses=None, processCheckTime=1.0 ):
24 """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be
25 parsed by [fvFileParser].
26 [fvSimScorer] should be a function that can return a floating point value for a pair of vectors.
27
28 If the matrix [rowIDs] are not specified, they will be the same as the [colIDs].
29
30 [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data.
31 [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs.
32 [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not).
33
34 Return: a LabeledNumericMatrix with corresponding row and column IDs."""
35
36 # Setup row/col ID information.
37 useColIDs = list( colIDs )
38 useRowIDs = rowIDs or useColIDs
39 featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) )
40 verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) )
41 featureIDs = featureData.keys()
42 absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ]
43 if absentIDs:
44 raise exceptionType, "IDs %s not found in feature vector file." % absentIDs
45 # Otherwise, proceed to creation of matrix.
46 resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType )
47 calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False
48
49 # Setup data structures needed for parallelization.
50 numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses)
51 assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses
52 dataManager = multiprocessing.Manager()
53 sharedFeatureData = dataManager.dict( featureData )
54 resultQueue = multiprocessing.Queue()
55 # Assign jobs evenly through number of processors available.
56 jobList = [ list() for i in range(numSubprocesses) ]
57 calculationNumber = 0 # Will hold total number of results stored.
58 if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs.
59 remainingIDs = list( useRowIDs )
60 while remainingIDs:
61 firstID = remainingIDs[0]
62 for secondID in remainingIDs:
63 jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) )
64 calculationNumber += 1
65 remainingIDs.remove( firstID )
66 else: # Straight processing one at a time.
67 for rowID in useRowIDs:
68 for colID in useColIDs:
69 jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) )
70 calculationNumber += 1
71
72 verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] )
73 # Define a function to perform calculation and store results
74 def runJobs( scoreFunc, pairs, featureData, resultQueue ):
75 for pair in pairs:
76 score = scoreFunc( featureData[pair[0]], featureData[pair[1]] )
77 resultQueue.put( ( pair, score ) )
78 verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name )
79
80
81 # Create processes to perform parallelized computing.
82 processes = list()
83 for num in range(numSubprocesses):
84 processes.append( multiprocessing.Process( target=runJobs,
85 args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) )
86 # Launch processes and wait for them to all complete.
87 import Queue # For Queue.Empty exception.
88 for p in processes:
89 p.start()
90 assignmentsCompleted = 0
91 while assignmentsCompleted < calculationNumber:
92 numActive = [ p.is_alive() for p in processes ].count( True )
93 verbose and debugMessage( "%i/%i complete; Active processes: %i" % \
94 ( assignmentsCompleted, calculationNumber, numActive ) )
95 while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging.
96 try:
97 pair, score = resultQueue.get( block=False )
98 resultMatrix[ pair[0], pair[1] ] = score
99 assignmentsCompleted += 1
100 if calculateSymmetric:
101 resultMatrix[ pair[1], pair[0] ] = score
102 except Queue.Empty:
103 break
104 if numActive == 0: finished = True
105 else:
106 time.sleep( processCheckTime )
107 # Result queue emptied and no active processes remaining - completed calculations.
108 return resultMatrix
109 ## end of createSimilarityMatrix()
第 36-47 行只是与作为原始问题一部分的问题定义相关的初步内容。绕过 cPython 的 GIL 的多处理设置在第 49-56 行,第 57-70 行用于均匀地创建细分任务。使用第 57-70 行的代码代替 itertools.product,因为当行/列 ID 列表达到 40,000 左右时,产品最终会占用大量内存。
要执行的实际计算在第 74-78 行,这里使用了 ID->vector 条目的共享字典和共享结果队列。
第 81-85 行设置了实际的 Process 对象,尽管它们实际上还没有启动。
在我的第一次尝试中(此处未显示),“try ... resultQueue.get() and assign except ...”代码实际上位于外部控制循环之外(虽然并非所有计算都已完成)。当我在 9x9 矩阵的单元测试中运行该版本的代码时,没有任何问题。然而,在向上移动到 200x200 或更大时,我发现此代码挂起,尽管在执行之间没有更改代码中的任何内容。
根据这个讨论(http://bugs.python.org/issue8426)和多进程的官方文档,如果底层实现没有非常大的管道/套接字大小,则使用 multiprocess.Queue 可能会挂起。因此,这里给出的代码作为我的解决方案会定期清空队列,同时检查进程的完成情况(参见第 91-106 行),以便子进程可以继续将新结果放入其中并避免管道过满。
当我在 1000x1000 的较大矩阵上测试代码时,我注意到计算代码的完成远远领先于队列和矩阵分配代码。使用 cProfile,我发现一个瓶颈是默认轮询间隔 processCheckTime=1.0(第 23 行),降低此值可以提高结果速度(有关时序示例,请参见帖子底部)。对于 Python 中的多处理新手来说,这可能是有用的信息。
总的来说,这可能不是最好的实现,但它确实为进一步优化提供了一个起点。正如人们常说的,通过并行化进行优化需要适当的分析和思考。
时序示例,全部使用 8 个 CPU。
200x200(20100 次计算/分配)
t=1.0 : 执行时间 18s
t=0.01:执行时间3s
500x500(125250 次计算/分配)
t=1.0 : 执行时间 86s
t=0.01:执行时间23s
如果有人想复制和粘贴代码,这是我用于部分开发的单元测试。显然,标记的矩阵类代码不在这里,并且指纹读取器/记分器代码不包括在内(尽管滚动您自己的代码非常简单)。当然,如果对某人有所帮助,我也很乐意分享该代码。
112 def unitTest():
113 import cStringIO, os
114 from fingerprintReader import MismatchKernelReader
115 from fingerprintScorers import FeatureVectorLinearKernel
116 exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117 exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK" + os.linesep )
118 exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119 exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120 exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121 exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122 exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123 exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124 exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125 exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126 exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127 exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128 columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129 "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130 m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131 verbose=True, )
132 m.SetOutputPrecision( 6 )
133 print m
134
135 ## end of unitTest()