2

我正在编写一些 Python 代码,并且需要让它运行大量数据,因此我决定使用多处理模块。但我发现,当我改变进程数量时,我会得到不同的结果,这显然是不可取的。

我的代码中有一些随机性,但是我在中央进程中执行程序后立即修复随机种子(随机种子仅用于选择标记和未标记的数据点)。子过程没有随机性,只是简单地计算矩阵或向量运算。

这是相关的代码。我使用相关的 numpy 和 scipy 模块。剩下的代码只是设置,与手头的问题无关。

该函数update_pq_wrapper产生子进程。函数update_pupdate_q是在数据子集上调用的函数。 update_pq_wrapper从子过程中收集结果并将它们放在一起。W是表示亲和矩阵的全局变量。

def update_p(startIdx, endIdx, betaMatrix, out_q):
  p = np.zeros((endIdx-startIdx, max(LABELS)+1))
  W_colsum = W.sum(1)
  for i in xrange(startIdx, endIdx): #for i 0 to numExamples - 1                                                                                                                                                                            
    dist = np.zeros(max(LABELS)+1, dtype=np.float64)                                                                                                                                                           
    g = gamma(W_colsum, i)
    dist = np.exp(betaMatrix[i,:] / g)
    dist /= np.sum(dist)
    p[i-startIdx,:] = dist
  out_q.put(p)

def update_q(startIdx, endIdx, sumMatrix, out_q):
  q = np.zeros((endIdx-startIdx, max(LABELS)+1))
  W_rowsum = W.sum(0)
  for i in xrange(startIdx, endIdx):
    dist = np.zeros(max(LABELS)+1, dtype=np.float64)
    labeled = 1 if L[i] else 0
    dist = (R[i,:]*labeled + mu*sumMatrix[i,:]) / (labeled + mu*W_rowsum[0,i])
    dist /= np.sum(dist)
    q[i-startIdx,:] = dist
  out_q.put(q)

def update_pq_wrapper(curIter, chunksize, mat, update_step='p'):
  startIdx = xrange(0, dataset_size, chunksize)
  prevIter = 1 - curIter
  procs = []
  out_q = Queue()
  for i in range(ncores):    #now, start each of the child processes that assembles the matrices individually                                                                                                                               
    start = startIdx[i]
    end = dataset_size if i == ncores-1 else startIdx[i+1]
    if update_step == 'p':
        proc = Process(target=update_p, args=(start, end, mat, out_q))
    else:
        proc = Process(target=update_q, args=(start, end, mat, out_q))
    procs.append(proc)
    proc.start()
  if update_step == 'p':    #once completed, collect results                                                                                                                                                                                
    distMat = P1 if curIter else P0
  else:
    distMat = Q1 if curIter else Q0
  for i in range(ncores):
    p_chunk = out_q.get()
    start = startIdx[i]
    end = dataset_size if i == ncores-1 else startIdx[i+1]
    distMat[start:end,:] = p_chunk
  for proc in procs:
    proc.join()

现在,如果我只用 1 个进程运行它,我会得到以下结果(5 次迭代的准确性): 2.16 --> 26.56 --> 27.37 --> 27.63 --> 27.83

但是,如果我用 2 个进程运行它,我会得到以下结果: 2.16 --> 3.72 --> 18.74 --> 14.81 --> 16.51

并有 4 个进程:2.16 --> 13.78 --> 13.85 --> 15.67 --> 13.12

我不确定这种行为的原因是什么,尤其是考虑到上面粘贴的代码。

阿夫内什

编辑(太平洋时间 2013 年 1 月 15 日下午 3:34

根据某些人的要求,我复制粘贴了下面的整个代码,并简要说明了代码的确切用途。

基本思想是我有一个图,由一个亲和矩阵表示W。每个节点表示每个节点的可能标签集上的概率分布。因此,对于带标签的示例,节点具有退化的概率分布,对应于标签的行的值为 1,其他任何地方的值为 0。对于未标记的节点,每个节点的结果标签是一个分布,可以采用该分布的 MAP 点估计来获得该点的标签。有关该方法的更多详细信息,请参阅此处。使用称为交替最小化的技术求解目标函数,其中提出了 2 个概率分布(pq代码中),我们迭代直到分布收敛到相同的值。

正如其中一位评论者所建议的那样,我移动了该proc.join()部分,使其高于加入后发生的操作。这似乎导致代码无法超越生成子进程的阶段,迫使我从键盘中断执行。也许我只是做错了什么。

#!/usr/bin/python
import sys, commands, string, cPickle
import numpy as np
import scipy.sparse as sp
import scipy.stats as stats
import scipy.linalg as la
from math import ceil
from time import clock
from multiprocessing import Process, Queue
from Queue import Empty

np.random.seed(42)
if not len(sys.argv) == 9:
  print 'ERROR: Usage: python alternating_minimization.py <binary data or sim matrix> <labels_file> <iterations> <num cores> <label percent> <v> <mu> <alpha>'
  sys.exit()

########################################                                                                                                                                                                                                      
# Main Parameters                                                                                                                                                                                                                             
########################################                                                                                                                                                                                                      
similarity_file = sys.argv[1] #output of simgraph_construction.py                                                                                                                                                                             
labels_file = sys.argv[2]
niterations = int(sys.argv[3])
ncores = int(sys.argv[4])

########################################                                                                                                                                                                                                      
# meta parameters                                                                                                                                                                                                                             
########################################                                                                                                                                                                                                      
label_percent = float(sys.argv[5])
v = float(sys.argv[6])
mu = float(sys.argv[7])
alpha = float(sys.argv[8])

########################################                                                                                                                                                                                                      
# load the data file (output of simgraph_construction.py) which is already in numpy format                                                                                                                                                    
########################################                                                                                                                                                                                                      
W = cPickle.load(open(similarity_file, 'r'))
#print some summary statistics about the similarity matrix file                                                                                                                                                                               
print "Number of examples: %d"%(W.shape[0])
print "Sim Matrix: nnz = %d, density = %.2f percent, average # of neighbors per example: %.2f"%(W.nnz, 100*(float(W.nnz)/(W.shape[0]**2)), float(W.nnz)/W.shape[0])

########################################                                                                                                                                                                                                      
# load the labels                                                                                                                                                                                                                             
########################################                                                                                                                                                                                                      
def convertLabels(labels):
  unique_labels = np.unique(labels)
  label_dict = {}
  idx = 0
  for label in unique_labels:
    label_dict[label] = idx
    idx += 1
  return label_dict

LABELS = np.load(labels_file)
print "Number of unique labels: %d"%(np.unique(LABELS).shape)
label_dict = convertLabels(LABELS)
NEW_LABELS = np.array([label_dict[label] for label in LABELS])
dataset_size = LABELS.shape[0]
LABELS = NEW_LABELS
W = W + alpha*sp.identity(dataset_size)

########################################                                                                                                                                                                                                      
# define the labeled and unlabeled idxs                                                                                                                                                                                                       
########################################                                                                                                                                                                                                      
def make_test_set():
  idx = np.random.rand(dataset_size)
  l = (idx < label_percent)
  u = (idx >= label_percent)
  return l,u

L,U = make_test_set()
def createRDistribution(label_bool, labels):
  rows = np.array(range(0, dataset_size), dtype=int)
  label_idx = np.where(~label_bool)
  rows = np.delete(rows, label_idx)
  cols = np.delete(labels, label_idx)
  vals = np.ones((rows.shape[0],1)).ravel()
  sparseR = sp.csc_matrix((vals, (rows, cols)), shape=(dataset_size, max(labels)+1))
  return sparseR

########################################                                                                                                                                                                                                      
# make the distributions for the data                                                                                                                                                                                                         
########################################                                                                                                                                                                                                      
R = createRDistribution(L, LABELS) #labeled distribution is sparse                                                                                                                                                                            
classNoLabel = np.where(R.sum(0) == 0)
#print classNoLabel #need to figure out how many classes are unrepresented in the labeld set                                                                                                                                                  
Q0 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
Q0 += 1.0 / Q0.shape[1]
Q1 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
P0 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
P1 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)

def gamma(W_sum,i): #W_sum is sum across all columns of sim matrix W                                                                                                                                                                          
  return v + mu * W_sum[i]

def update_p(startIdx, endIdx, betaMatrix, out_q):
  p = np.zeros((endIdx-startIdx, max(LABELS)+1))
  W_colsum = W.sum(1)
  for i in xrange(startIdx, endIdx): #for i 0 to numExamples - 1                                                                                                                                                                            
    dist = np.zeros(max(LABELS)+1, dtype=np.float64)                                                                                                                                                           
    g = gamma(W_colsum, i)
    dist = np.exp(betaMatrix[i,:] / g)
    dist /= np.sum(dist)
    p[i-startIdx,:] = dist
  out_q.put(p)

def update_q(startIdx, endIdx, sumMatrix, out_q):
  q = np.zeros((endIdx-startIdx, max(LABELS)+1))
  W_rowsum = W.sum(0)
  for i in xrange(startIdx, endIdx):
    dist = np.zeros(max(LABELS)+1, dtype=np.float64)
    labeled = 1 if L[i] else 0
    dist = (R[i,:]*labeled + mu*sumMatrix[i,:]) / (labeled + mu*W_rowsum[0,i])
    dist /= np.sum(dist)
    q[i-startIdx,:] = dist
  out_q.put(q)

def update_pq_wrapper(curIter, chunksize, mat, update_step='p'):
  startIdx = xrange(0, dataset_size, chunksize)
  prevIter = 1 - curIter
  procs = []
  out_q = Queue()
  for i in range(ncores):    #now, start each of the child processes that assembles the matrices individually                                                                                                                               
    start = startIdx[i]
    end = dataset_size if i == ncores-1 else startIdx[i+1]
    if update_step == 'p':
        proc = Process(target=update_p, args=(start, end, mat, out_q))
    else:
        proc = Process(target=update_q, args=(start, end, mat, out_q))
    procs.append(proc)
    proc.start()
  if update_step == 'p':    #once completed, collect results                                                                                                                                                                                
    distMat = P1 if curIter else P0
  else:
    distMat = Q1 if curIter else Q0
  for i in range(ncores):
    p_chunk = out_q.get()
    start = startIdx[i]
    end = dataset_size if i == ncores-1 else startIdx[i+1]
    distMat[start:end,:] = p_chunk
  for proc in procs:
    proc.join()

def compute_tvdist(P,Q):
  tv_dist = 0
  for i in range(0, dataset_size):
    tv_dist += max(np.absolute(P[i,:] - Q[i,:]))
  return tv_dist/dataset_size

def main(argv):
  accuracyArr = []
  tvdistArr = []
  print >> sys.stderr, 'Starting %d iterations...' % niterations
  chunksize = int(ceil(dataset_size/float(ncores)))
  for n in xrange(1,niterations+1):
    print >> sys.stderr, 'Iteration %d' % n
    idx = n % 2
    q_prev = Q1 if not idx else Q0
    p_cur = P1 if idx else P0
    #print q_prev                                                                                                                                                                                                                         
    start_time = clock()
    mat = -v + mu*(W*(np.log(q_prev)-1))
    end_time = clock()
    #print mat                                                                                                                                                                                                                            
    print "Time taken to compute Beta Matrix: %.2f seconds"%(end_time-start_time)
    start_time=clock()
    update_pq_wrapper(idx, chunksize, mat, 'p')
    end_time=clock()
    print "Time taken to update P matrix: %.2f seconds"%(end_time-start_time)
    if not n == niterations:
      start_time = clock()
      mat = W.T*p_cur
      end_time = clock()
      print "Time taken to compute Sum Matrix: %.2f seconds"%(end_time-start_time)
      start_time = clock()
      update_pq_wrapper(idx, chunksize, mat, 'q')
      end_time = clock()
      print "Time taken to update Q matrix: %.2f seconds"%(end_time-start_time)
    ## Evaluation ##                                                                                                                                                                                                                      
    evalMat = P1 if idx else P0                                                                                                                                                   
    predLabel = np.argmax(evalMat, axis=1) #gives the index (column)                                                                                                                                                                      
    accuracy = float(np.sum(predLabel[np.where(U)] == LABELS[np.where(U)]) )/ LABELS[np.where(U)].shape[0]
    print "Accuracy: %.2f"%(accuracy*100)
    accuracyArr.append(accuracy)
    totalVar = []
    if n != niterations:
        tv_dist = compute_tvdist(P1, Q1) if idx else compute_tvdist(P0, Q0)
    else:
        tv_dist = compute_tvdist(P1, Q0) if idx else compute_tvdist(P0, Q1)
    print "Average Total Variation Distance is %.3f"%(tv_dist)
    tvdistArr.append(tv_dist)
  print "Summary of final probability density matrix: "
  print evalMat
  print '\t'.join([str(round(acc,4)) for acc in accuracyArr])
4

1 回答 1

2

这已经解决了。我不知道我在想什么,但很明显,当从子流程返回时,数据将被异步写入,而不是按照我期望的顺序(行的顺序)。一个简单的解决方法是让子流程返回数据元组以及它正在处理的数据的哪一部分的指示符,然后使用此信息将结果矩阵重新组合在一起。

于 2013-02-16T21:30:43.570 回答