我正在编写一些 Python 代码,并且需要让它运行大量数据,因此我决定使用多处理模块。但我发现,当我改变进程数量时,我会得到不同的结果,这显然是不可取的。
我的代码中有一些随机性,但是我在中央进程中执行程序后立即修复随机种子(随机种子仅用于选择标记和未标记的数据点)。子过程没有随机性,只是简单地计算矩阵或向量运算。
这是相关的代码。我使用相关的 numpy 和 scipy 模块。剩下的代码只是设置,与手头的问题无关。
该函数update_pq_wrapper
产生子进程。函数update_p
和update_q
是在数据子集上调用的函数。 update_pq_wrapper
从子过程中收集结果并将它们放在一起。W
是表示亲和矩阵的全局变量。
def update_p(startIdx, endIdx, betaMatrix, out_q):
p = np.zeros((endIdx-startIdx, max(LABELS)+1))
W_colsum = W.sum(1)
for i in xrange(startIdx, endIdx): #for i 0 to numExamples - 1
dist = np.zeros(max(LABELS)+1, dtype=np.float64)
g = gamma(W_colsum, i)
dist = np.exp(betaMatrix[i,:] / g)
dist /= np.sum(dist)
p[i-startIdx,:] = dist
out_q.put(p)
def update_q(startIdx, endIdx, sumMatrix, out_q):
q = np.zeros((endIdx-startIdx, max(LABELS)+1))
W_rowsum = W.sum(0)
for i in xrange(startIdx, endIdx):
dist = np.zeros(max(LABELS)+1, dtype=np.float64)
labeled = 1 if L[i] else 0
dist = (R[i,:]*labeled + mu*sumMatrix[i,:]) / (labeled + mu*W_rowsum[0,i])
dist /= np.sum(dist)
q[i-startIdx,:] = dist
out_q.put(q)
def update_pq_wrapper(curIter, chunksize, mat, update_step='p'):
startIdx = xrange(0, dataset_size, chunksize)
prevIter = 1 - curIter
procs = []
out_q = Queue()
for i in range(ncores): #now, start each of the child processes that assembles the matrices individually
start = startIdx[i]
end = dataset_size if i == ncores-1 else startIdx[i+1]
if update_step == 'p':
proc = Process(target=update_p, args=(start, end, mat, out_q))
else:
proc = Process(target=update_q, args=(start, end, mat, out_q))
procs.append(proc)
proc.start()
if update_step == 'p': #once completed, collect results
distMat = P1 if curIter else P0
else:
distMat = Q1 if curIter else Q0
for i in range(ncores):
p_chunk = out_q.get()
start = startIdx[i]
end = dataset_size if i == ncores-1 else startIdx[i+1]
distMat[start:end,:] = p_chunk
for proc in procs:
proc.join()
现在,如果我只用 1 个进程运行它,我会得到以下结果(5 次迭代的准确性): 2.16 --> 26.56 --> 27.37 --> 27.63 --> 27.83
但是,如果我用 2 个进程运行它,我会得到以下结果: 2.16 --> 3.72 --> 18.74 --> 14.81 --> 16.51
并有 4 个进程:2.16 --> 13.78 --> 13.85 --> 15.67 --> 13.12
我不确定这种行为的原因是什么,尤其是考虑到上面粘贴的代码。
阿夫内什
编辑(太平洋时间 2013 年 1 月 15 日下午 3:34
根据某些人的要求,我复制粘贴了下面的整个代码,并简要说明了代码的确切用途。
基本思想是我有一个图,由一个亲和矩阵表示W
。每个节点表示每个节点的可能标签集上的概率分布。因此,对于带标签的示例,节点具有退化的概率分布,对应于标签的行的值为 1,其他任何地方的值为 0。对于未标记的节点,每个节点的结果标签是一个分布,可以采用该分布的 MAP 点估计来获得该点的标签。有关该方法的更多详细信息,请参阅此处。使用称为交替最小化的技术求解目标函数,其中提出了 2 个概率分布(p
在q
代码中),我们迭代直到分布收敛到相同的值。
正如其中一位评论者所建议的那样,我移动了该proc.join()
部分,使其高于加入后发生的操作。这似乎导致代码无法超越生成子进程的阶段,迫使我从键盘中断执行。也许我只是做错了什么。
#!/usr/bin/python
import sys, commands, string, cPickle
import numpy as np
import scipy.sparse as sp
import scipy.stats as stats
import scipy.linalg as la
from math import ceil
from time import clock
from multiprocessing import Process, Queue
from Queue import Empty
np.random.seed(42)
if not len(sys.argv) == 9:
print 'ERROR: Usage: python alternating_minimization.py <binary data or sim matrix> <labels_file> <iterations> <num cores> <label percent> <v> <mu> <alpha>'
sys.exit()
########################################
# Main Parameters
########################################
similarity_file = sys.argv[1] #output of simgraph_construction.py
labels_file = sys.argv[2]
niterations = int(sys.argv[3])
ncores = int(sys.argv[4])
########################################
# meta parameters
########################################
label_percent = float(sys.argv[5])
v = float(sys.argv[6])
mu = float(sys.argv[7])
alpha = float(sys.argv[8])
########################################
# load the data file (output of simgraph_construction.py) which is already in numpy format
########################################
W = cPickle.load(open(similarity_file, 'r'))
#print some summary statistics about the similarity matrix file
print "Number of examples: %d"%(W.shape[0])
print "Sim Matrix: nnz = %d, density = %.2f percent, average # of neighbors per example: %.2f"%(W.nnz, 100*(float(W.nnz)/(W.shape[0]**2)), float(W.nnz)/W.shape[0])
########################################
# load the labels
########################################
def convertLabels(labels):
unique_labels = np.unique(labels)
label_dict = {}
idx = 0
for label in unique_labels:
label_dict[label] = idx
idx += 1
return label_dict
LABELS = np.load(labels_file)
print "Number of unique labels: %d"%(np.unique(LABELS).shape)
label_dict = convertLabels(LABELS)
NEW_LABELS = np.array([label_dict[label] for label in LABELS])
dataset_size = LABELS.shape[0]
LABELS = NEW_LABELS
W = W + alpha*sp.identity(dataset_size)
########################################
# define the labeled and unlabeled idxs
########################################
def make_test_set():
idx = np.random.rand(dataset_size)
l = (idx < label_percent)
u = (idx >= label_percent)
return l,u
L,U = make_test_set()
def createRDistribution(label_bool, labels):
rows = np.array(range(0, dataset_size), dtype=int)
label_idx = np.where(~label_bool)
rows = np.delete(rows, label_idx)
cols = np.delete(labels, label_idx)
vals = np.ones((rows.shape[0],1)).ravel()
sparseR = sp.csc_matrix((vals, (rows, cols)), shape=(dataset_size, max(labels)+1))
return sparseR
########################################
# make the distributions for the data
########################################
R = createRDistribution(L, LABELS) #labeled distribution is sparse
classNoLabel = np.where(R.sum(0) == 0)
#print classNoLabel #need to figure out how many classes are unrepresented in the labeld set
Q0 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
Q0 += 1.0 / Q0.shape[1]
Q1 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
P0 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
P1 = np.zeros((dataset_size, max(LABELS)+1), dtype=np.double)
def gamma(W_sum,i): #W_sum is sum across all columns of sim matrix W
return v + mu * W_sum[i]
def update_p(startIdx, endIdx, betaMatrix, out_q):
p = np.zeros((endIdx-startIdx, max(LABELS)+1))
W_colsum = W.sum(1)
for i in xrange(startIdx, endIdx): #for i 0 to numExamples - 1
dist = np.zeros(max(LABELS)+1, dtype=np.float64)
g = gamma(W_colsum, i)
dist = np.exp(betaMatrix[i,:] / g)
dist /= np.sum(dist)
p[i-startIdx,:] = dist
out_q.put(p)
def update_q(startIdx, endIdx, sumMatrix, out_q):
q = np.zeros((endIdx-startIdx, max(LABELS)+1))
W_rowsum = W.sum(0)
for i in xrange(startIdx, endIdx):
dist = np.zeros(max(LABELS)+1, dtype=np.float64)
labeled = 1 if L[i] else 0
dist = (R[i,:]*labeled + mu*sumMatrix[i,:]) / (labeled + mu*W_rowsum[0,i])
dist /= np.sum(dist)
q[i-startIdx,:] = dist
out_q.put(q)
def update_pq_wrapper(curIter, chunksize, mat, update_step='p'):
startIdx = xrange(0, dataset_size, chunksize)
prevIter = 1 - curIter
procs = []
out_q = Queue()
for i in range(ncores): #now, start each of the child processes that assembles the matrices individually
start = startIdx[i]
end = dataset_size if i == ncores-1 else startIdx[i+1]
if update_step == 'p':
proc = Process(target=update_p, args=(start, end, mat, out_q))
else:
proc = Process(target=update_q, args=(start, end, mat, out_q))
procs.append(proc)
proc.start()
if update_step == 'p': #once completed, collect results
distMat = P1 if curIter else P0
else:
distMat = Q1 if curIter else Q0
for i in range(ncores):
p_chunk = out_q.get()
start = startIdx[i]
end = dataset_size if i == ncores-1 else startIdx[i+1]
distMat[start:end,:] = p_chunk
for proc in procs:
proc.join()
def compute_tvdist(P,Q):
tv_dist = 0
for i in range(0, dataset_size):
tv_dist += max(np.absolute(P[i,:] - Q[i,:]))
return tv_dist/dataset_size
def main(argv):
accuracyArr = []
tvdistArr = []
print >> sys.stderr, 'Starting %d iterations...' % niterations
chunksize = int(ceil(dataset_size/float(ncores)))
for n in xrange(1,niterations+1):
print >> sys.stderr, 'Iteration %d' % n
idx = n % 2
q_prev = Q1 if not idx else Q0
p_cur = P1 if idx else P0
#print q_prev
start_time = clock()
mat = -v + mu*(W*(np.log(q_prev)-1))
end_time = clock()
#print mat
print "Time taken to compute Beta Matrix: %.2f seconds"%(end_time-start_time)
start_time=clock()
update_pq_wrapper(idx, chunksize, mat, 'p')
end_time=clock()
print "Time taken to update P matrix: %.2f seconds"%(end_time-start_time)
if not n == niterations:
start_time = clock()
mat = W.T*p_cur
end_time = clock()
print "Time taken to compute Sum Matrix: %.2f seconds"%(end_time-start_time)
start_time = clock()
update_pq_wrapper(idx, chunksize, mat, 'q')
end_time = clock()
print "Time taken to update Q matrix: %.2f seconds"%(end_time-start_time)
## Evaluation ##
evalMat = P1 if idx else P0
predLabel = np.argmax(evalMat, axis=1) #gives the index (column)
accuracy = float(np.sum(predLabel[np.where(U)] == LABELS[np.where(U)]) )/ LABELS[np.where(U)].shape[0]
print "Accuracy: %.2f"%(accuracy*100)
accuracyArr.append(accuracy)
totalVar = []
if n != niterations:
tv_dist = compute_tvdist(P1, Q1) if idx else compute_tvdist(P0, Q0)
else:
tv_dist = compute_tvdist(P1, Q0) if idx else compute_tvdist(P0, Q1)
print "Average Total Variation Distance is %.3f"%(tv_dist)
tvdistArr.append(tv_dist)
print "Summary of final probability density matrix: "
print evalMat
print '\t'.join([str(round(acc,4)) for acc in accuracyArr])