6

我有一个包含 {string:list} 条目的字典 D,并且我为 D 中的一对字符串 (s1,s2) 计算了一个函数 f( D[s1],D[s2] ) --> float。

此外,我创建了一个自定义矩阵类 LabeledNumericMatrix,它允许我执行诸如 m[ ID1, ID2 ] = 1.0 之类的赋值。

我需要计算 f(x,y) 并将结果存储在 m[x,y] 中,用于字符串集合 S 中的所有 2 元组,包括当 s1=s2 时。这很容易编码为循环,但是随着集合 S 的大小增长到较大的值(例如 10,000 或更多),此代码的执行需要相当长的时间。

我存储在标记矩阵 m 中的所有结果都不相互依赖。因此,使用 python 的多线程或多进程服务并行化这种计算似乎很简单。但是,由于 cPython 并没有真正允许我通过线程同时执行 f(x,y) 的计算和 m[x,y] 的存储,看来多进程是我唯一的选择。但是,我不认为多进程旨在在进程之间传递大约 1GB 的数据结构,例如我的标记矩阵结构包含 10000x10000 个元素。

任何人都可以提供以下建议(a)我是否应该避免尝试并行化我的算法,以及(b)如果我可以进行并行化,如何做到这一点,最好是在 cPython 中?

4

5 回答 5

6

第一个选项 -服务器进程

创建一个服务器进程。它是 Multiprocessing 包的一部分,允许并行访问数据结构。这样每个进程都将直接访问数据结构,锁定其他进程。

文档中

服务器进程

Manager() 返回的管理器对象控制一个服务器进程,该进程保存 Python 对象并允许其他进程使用代理来操作它们。

Manager() 返回的管理器将支持类型 list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Queue、Value 和 Array。

第二种选择 - 工人池

创建一个工人池、一个输入队列和一个结果队列。

  • 作为生产者的主进程将向输入队列提供对 (s1, s2)。
  • 每个工作进程将从输入队列中读取一对,并将结果写入输出队列。
  • 主线程将从结果队列中读取结果,并将它们写入结果字典。

第三种选择 - 划分为独立问题

您的数据是独立的: f( D[s i ],D[s j ] ) 是一个隐蔽的问题,独立于任何 f( D[s k ],D[s l ] ) 。此外,每一对的计算时间应该相当相等,或者至少在相同的数量级。

将任务分成n 个输入集,其中n是您拥有的计算单元(核心,甚至计算机)的数量。将每个输入集分配给不同的进程,并加入输出。

于 2012-02-20T10:10:30.923 回答
2

你绝对不会得到任何性能提升threading——它是一个不适合 CPU 密集型任务的工具。

所以唯一可能的选择是multiprocessing,但由于你有一个大数据结构,我建议像mmap(相当低级,但内置)或Redis(美味和高级 API,但应该安装和配置)。

于 2012-02-20T10:14:14.963 回答
1

你分析过你的代码吗?它只是计算 f 太昂贵,还是将结果存储在数据结构中(或两者兼而有之)?

如果 f 占主导地位,那么您应该确保在开始担心并行化之前无法进行算法改进。您可以通过将部分或全部函数转换为 C 扩展(可能使用cython )来大大加快速度。如果您确实使用多处理,那么我不明白为什么需要在进程之间传递整个数据结构?

如果将结果存储在矩阵中过于昂贵,您可以通过使用更高效的数据结构(如array.arraynumpy.ndarray)来加快代码速度。除非您非常仔细地设计和实现自定义矩阵类,否则它几乎肯定会比那些慢。

于 2012-02-20T12:34:18.077 回答
0

谢谢大家的回复。

我已经为提议的问题创建了一个解决方案(不是“解决方案”),并且由于其他人可能会发现它很有用,因此我在此处发布代码。我的解决方案是 Adam Matan 建议的选项 1 和 3 的混合体。该代码包含来自我的 vi 会话的行号,这将有助于下面的讨论。

 12 # System libraries needed by this module.
 13 import numpy, multiprocessing, time
 14 
 15 # Third-party libraries needed by this module.
 16 import labeledMatrix
 17 
 18 # ----- Begin code for this module. -----
 19 from commonFunctions import debugMessage
 20 
 21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None,
 22                             exceptionType=ValueError, useNumType=numpy.float, verbose=False,
 23                             maxProcesses=None, processCheckTime=1.0 ):
 24  """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be
 25  parsed by [fvFileParser].
 26  [fvSimScorer] should be a function that can return a floating point value for a pair of vectors.
 27 
 28  If the matrix [rowIDs] are not specified, they will be the same as the [colIDs].
 29 
 30  [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data.
 31  [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs.
 32  [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not).
 33 
 34  Return: a LabeledNumericMatrix with corresponding row and column IDs."""
 35 
 36  # Setup row/col ID information.
 37  useColIDs = list( colIDs )
 38  useRowIDs = rowIDs or useColIDs
 39  featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) )
 40  verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) )
 41  featureIDs = featureData.keys()
 42  absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ]
 43  if absentIDs: 
 44   raise exceptionType, "IDs %s not found in feature vector file." % absentIDs
 45  # Otherwise, proceed to creation of matrix.
 46  resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType )
 47  calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False
 48  
 49  # Setup data structures needed for parallelization.
 50  numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses)
 51  assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses
 52  dataManager = multiprocessing.Manager()
 53  sharedFeatureData = dataManager.dict( featureData )
 54  resultQueue = multiprocessing.Queue() 
 55  # Assign jobs evenly through number of processors available.
 56  jobList = [ list() for i in range(numSubprocesses) ]
 57  calculationNumber = 0 # Will hold total number of results stored.
 58  if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs.
 59   remainingIDs = list( useRowIDs )
 60   while remainingIDs:
 61    firstID = remainingIDs[0]
 62    for secondID in remainingIDs:
 63     jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) )
 64     calculationNumber += 1
 65    remainingIDs.remove( firstID )
 66  else: # Straight processing one at a time.
 67   for rowID in useRowIDs:
 68    for colID in useColIDs:
 69     jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) )
 70     calculationNumber += 1
 71     
 72  verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] )
 73  # Define a function to perform calculation and store results
 74  def runJobs( scoreFunc, pairs, featureData, resultQueue ):
 75   for pair in pairs:
 76    score = scoreFunc( featureData[pair[0]], featureData[pair[1]] )
 77    resultQueue.put( ( pair, score ) )
 78   verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name )
 79   
 80   
 81  # Create processes to perform parallelized computing.
 82  processes = list()
 83  for num in range(numSubprocesses):
 84   processes.append( multiprocessing.Process( target=runJobs,
 85                                              args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) )
 86  # Launch processes and wait for them to all complete.
 87  import Queue # For Queue.Empty exception.
 88  for p in processes:
 89   p.start()
 90  assignmentsCompleted = 0
 91  while assignmentsCompleted < calculationNumber:
 92   numActive = [ p.is_alive() for p in processes ].count( True )
 93   verbose and debugMessage( "%i/%i complete; Active processes: %i" % \
 94               ( assignmentsCompleted, calculationNumber, numActive ) )
 95   while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging.
 96    try: 
 97     pair, score = resultQueue.get( block=False )
 98     resultMatrix[ pair[0], pair[1] ] = score
 99     assignmentsCompleted += 1
100     if calculateSymmetric:
101      resultMatrix[ pair[1], pair[0] ] = score
102    except Queue.Empty:
103     break 
104   if numActive == 0: finished = True
105   else:
106    time.sleep( processCheckTime )
107  # Result queue emptied and no active processes remaining - completed calculations.
108  return resultMatrix
109 ## end of createSimilarityMatrix()

第 36-47 行只是与作为原始问题一部分的问题定义相关的初步内容。绕过 cPython 的 GIL 的多处理设置在第 49-56 行,第 57-70 行用于均匀地创建细分任务。使用第 57-70 行的代码代替 itertools.product,因为当行/列 ID 列表达到 40,000 左右时,产品最终会占用大量内存。

要执行的实际计算在第 74-78 行,这里使用了 ID->vector 条目的共享字典和共享结果队列。

第 81-85 行设置了实际的 Process 对象,尽管它们实际上还没有启动。

在我的第一次尝试中(此处未显示),“try ... resultQueue.get() and assign except ...”代码实际上位于外部控制循环之外(虽然并非所有计算都已完成)。当我在 9x9 矩阵的单元测试中运行该版本的代码时,没有任何问题。然而,在向上移动到 200x200 或更大时,我发现此代码挂起,尽管在执行之间没有更改代码中的任何内容。

根据这个讨论(http://bugs.python.org/issue8426)和多进程的官方文档,如果底层实现没有非常大的管道/套接字大小,则使用 multiprocess.Queue 可能会挂起。因此,这里给出的代码作为我的解决方案会定期清空队列,同时检查进程的完成情况(参见第 91-106 行),以便子进程可以继续将新结果放入其中并避免管道过满。

当我在 1000x1000 的较大矩阵上测试代码时,我注意到计算代码的完成远远领先于队列和矩阵分配代码。使用 cProfile,我发现一个瓶颈是默认轮询间隔 processCheckTime=1.0(第 23 行),降低此值可以提高结果速度(有关时序示例,请参见帖子底部)。对于 Python 中的多处理新手来说,这可能是有用的信息。

总的来说,这可能不是最好的实现,但它确实为进一步优化提供了一个起点。正如人们常说的,通过并行化进行优化需要适当的分析和思考。

时序示例,全部使用 8 个 CPU。

200x200(20100 次计算/分配)

t=1.0 : 执行时间 18s

t=0.01:执行时间3s

500x500(125250 次计算/分配)

t=1.0 : 执行时间 86s

t=0.01:执行时间23s

如果有人想复制和粘贴代码,这是我用于部分开发的单元测试。显然,标记的矩阵类代码不在这里,并且指纹读取器/记分器代码不包括在内(尽管滚动您自己的代码非常简单)。当然,如果对某人有所帮助,我也很乐意分享该代码。

112 def unitTest():
113  import cStringIO, os
114  from fingerprintReader import MismatchKernelReader
115  from fingerprintScorers import FeatureVectorLinearKernel
116  exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117  exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK"  + os.linesep )
118  exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119  exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120  exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121  exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122  exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123  exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124  exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125  exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126  exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127  exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128  columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129                "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130  m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131                              verbose=True, )
132  m.SetOutputPrecision( 6 )
133  print m
134 
135 ## end of unitTest()
于 2012-03-21T15:21:04.187 回答
0

参考我在 3 月 21 日发布的代码所附的最后一条评论,我发现 multiprocessing.Pool + SQLite (pysqlite2) 无法用于我的特定任务,因为发生了两个问题:

(1) 使用默认连接,除了第一个worker,其他所有执行插入查询的worker进程只执行一次。(2) 当我将连接关键字更改为 check_same_thread=False 时,会使用完整的工作池,但只有部分查询成功,部分查询失败。当每个 worker 也执行 time.sleep(0.01) 时,查询失败的数量减少了,但不是全部。(3) 不太重要的是,即使对于 10 个插入查询的小型作业列表,我也能听到我的硬盘读/写疯狂。

接下来我求助于 MySQL-Python,结果好多了。诚然,必须为该用户设置 MySQL 服务器守护进程、用户和数据库,但这些步骤相对简单。

这是对我有用的示例代码。显然它可以被优化,但它为那些正在寻找如何开始使用多处理的人传达了基本思想。

  1 from multiprocessing import Pool, current_process
  2 import MySQLdb
  3 from numpy import random
  4
  5 
  6 if __name__ == "__main__":
  7  
  8   numValues   = 50000
  9   tableName   = "tempTable"
 10   useHostName = ""
 11   useUserName = ""  # Insert your values here.
 12   usePassword = ""
 13   useDBName   = ""
 14   
 15   # Setup database and table for results.
 16   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 17   topCursor = dbConnection.cursor()
 18   # Assuming table does not exist, will be eliminated at the end of the script.
 19   topCursor.execute( 'CREATE TABLE %s (oneText TEXT, oneValue REAL)' % tableName )
 20   topCursor.close() 
 21   dbConnection.close()
 22   
 23   # Define simple function for storing results.
 24   def work( storeValue ):
 25     #print "%s storing value %f" % ( current_process().name, storeValue )
 26     try:
 27       dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 28       cursor = dbConnection.cursor()
 29       cursor.execute( "SET AUTOCOMMIT=1" )
 30       try:
 31         query = "INSERT INTO %s VALUES ('%s',%f)" % ( tableName, current_process().name, storeValue )
 32         #print query
 33         cursor.execute( query )
 34       except:
 35         print "Query failed."
 36       
 37       cursor.close()
 38       dbConnection.close()
 39     except: 
 40       print "Connection/cursor problem."
 41   
 42   
 43   # Create set of values to assign
 44   values = random.random( numValues )
 45   
 46   # Create pool of workers 
 47   pool = Pool( processes=6 )
 48   # Execute assignments.
 49   for value in values: pool.apply_async( func=work, args=(value,) )
 50   pool.close()
 51   pool.join()
 52 
 53   # Cleanup temporary table.
 54   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 55   topCursor = dbConnection.cursor()
 56   topCursor.execute( 'DROP TABLE %s' % tableName )
 57   topCursor.close()
 58   dbConnection.close()
于 2012-03-27T07:20:27.517 回答