我最近玩弄了 Python 的多处理模块来加速隐马尔可夫模型的前向后向算法,因为前向过滤和后向过滤可以独立运行。看到运行时减半是令人敬畏的事情。
我现在尝试在我的迭代维特比算法中包含一些多处理。在这个算法中,我试图运行的两个进程不是独立的。val_max 部分可以独立运行,但 arg_max[t] 取决于 val_max[t-1]。因此,我提出了一个想法,即可以将 val_max 作为一个单独的进程运行,然后将 arg_max 也作为一个可以由 val_max 提供的单独进程运行。
我承认在这里有点超出我的深度,除了观看一些基本视频以及浏览博客之外,我对多处理了解不多。我在下面提供了我的尝试,但它不起作用。
import numpy as np
from time import time,sleep
import multiprocessing as mp
class Viterbi:
def __init__(self,A,B,pi):
self.M = A.shape[0] # number of hidden states
self.A = A # Transition Matrix
self.B = B # Observation Matrix
self.pi = pi # Initial distribution
self.T = None # time horizon
self.val_max = None
self.arg_max = None
self.obs = None
self.sleep_time = 1e-6
self.output = mp.Queue()
def get_path(self,x):
# returns the most likely state sequence given observed sequence x
# using the Viterbi algorithm
self.T = len(x)
self.val_max = np.zeros((self.T, self.M))
self.arg_max = np.zeros((self.T, self.M))
self.val_max[0] = self.pi*self.B[:,x[0]]
for t in range(1, self.T):
# Indepedent Process
self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,obs[t]]) , axis = 0 )
# Dependent Process
self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)
# BACKTRACK
states = np.zeros(self.T, dtype=np.int32)
states[self.T-1] = np.argmax(self.val_max[self.T-1])
for t in range(self.T-2, -1, -1):
states[t] = self.arg_max[t+1, states[t+1]]
return states
def get_val(self):
'''Independent Process'''
for t in range(1,self.T):
self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,self.obs[t]]) , axis = 0 )
self.output.put(self.val_max)
def get_arg(self):
'''Dependent Process'''
for t in range(1,self.T):
while 1:
# Process info if available
if self.val_max[t-1].any() != 0:
self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)
break
# Else sleep and wait for info to arrive
sleep(self.sleep_time)
self.output.put(self.arg_max)
def get_path_parallel(self,x):
self.obs = x
self.T = len(obs)
self.val_max = np.zeros((self.T, self.M))
self.arg_max = np.zeros((self.T, self.M))
val_process = mp.Process(target=self.get_val)
arg_process = mp.Process(target=self.get_arg)
# get first initial value for val_max which can feed arg_process
self.val_max[0] = self.pi*self.B[:,obs[0]]
arg_process.start()
val_process.start()
arg_process.join()
val_process.join()
注意:get_path_parallel 还没有回溯。
看起来 val_process 和 arg_process 从未真正运行过。真的不知道为什么会这样。您可以运行维特比算法的维基百科示例中的代码。
obs = np.array([0,1,2]) # normal then cold and finally dizzy
pi = np.array([0.6,0.4])
A = np.array([[0.7,0.3],
[0.4,0.6]])
B = np.array([[0.5,0.4,0.1],
[0.1,0.3,0.6]])
viterbi = Viterbi(A,B,pi)
path = viterbi.get_path(obs)
我也尝试过使用 Ray。然而,我不知道我在那里到底在做什么。你能帮我推荐一下如何让并行版本运行吗?我一定做错了什么,但我不知道是什么。
您的帮助将不胜感激。