1

我有一个数据示例。

从代码中可以看出,函数的每次调用fit_by_idx()都必须 print 'here',但事实并非如此。当 时一切正常n_jobs=1,但如果n_jobs大于joblib,则不调用该函数。

代码:

import statsmodels.tsa.holtwinters as holtwinters
import pandas as pd
import numpy as np
from joblib import Parallel, delayed

train = pd.read_csv('train.csv').drop(columns=['id'])


def iter_predict(data, model, steps, fit_args=[],  fit_kwargs={}): # steps - кол. предсказываемых точек
    def fit_by_idx(idx):
        print('here')
        endog = data.iloc[idx]
        fitted = model(endog).fit(*fit_args, optimized=False, **fit_kwargs)\
        res[idx, :] = fitted.forecast(steps)

    res = np.zeros((data.shape[0], steps))
    Parallel(n_jobs=2)(delayed(fit_by_idx)(idx) for idx in range(data.shape[0]))
    return res

iter_predict(train, holtwinters.SimpleExpSmoothing, 2, fit_kwargs={'smoothing_level': 0.5})

这是数据集的链接。

4

1 回答 1

0

Q:“ if n_jobsis more, thanjoblib不调用函数

嗯,它确实(你可以检查 PID 和 PPID 数字),
它只是不显示结果print( "here" )

使用 API 文档中的定义:

print( *objects, sep = ' ', end = '\n', file = sys.stdout, flush = False ) 从强制执行开始flush = True

然而,未来将面临更多的麻烦。joblib-spawns(除非另有强制要求,否则会以对性能产生不利影响为代价,如果返回到纯的[SERIAL]、GIL 控制的重新[SERIAL]编译的代码执行,任何n_jobs一步又一步地再次运行,这没有意义,因为您支付了实例化和其他开销的所有成本,但这样做并没有获得任何加速收益,不是吗?)。使用:

def iter_preDEMO( data,            # Pandas DF-alike data
                  #other args removed for MCVE-clarity
                  ):

    def fit_by_idx( idx ): #-------------------------------------[FUNCTION]-def-<start> To be transferred to each remote-joblib-initiated process(es)

        print( 'here[{0:_>4d}(PPID:PID={1:_>7d}:{2::>7d})]'.format( idx,
                                                                    os.getppid(), # test joblib-[FUNCTION]-def-transfer here with: lambda x = "_{0:}_" : x.format( os.getppid() )
                                                                    os.getpid()   # test joblib-[FUNCTION]-def-transfer here with: lambda x = "_{0:}_" : x.format( os.getpid()  )
                                                                    ),
                end   = "\t",
                flush = True
                )
    #------------------------------------------------------------[FUNCTION]-def-<end>

    res = np.zeros( ( data.shape[0], 3 ) )
    for aBackEND in ( 'threading', 'loky', 'multiprocessing' ):
        try:
             print( "\n____________________________Going into ['{0:}']-backend".format( aBackEND ) )
             with parallel_backend( aBackEND, n_jobs = N_JOBS ):
                  Parallel( n_jobs = N_JOBS )( delayed( fit_by_idx )( pickled_SER_DES_copy_of_idx )
                                               for                    pickled_SER_DES_copy_of_idx in range( data.shape[0] )
                                               )
        finally:
             print( "\n_____________________________Exit from ['{0:}']-backend".format( aBackEND ) )
    return res

你会看到事情是如何工作的,使用更详细的print()-ed 结果

START: PID=_____22528

____________________________Going into ['threading']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['threading']-backend

____________________________Going into ['loky']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['loky']-backend

____________________________Going into ['multiprocessing']-backend
here[___0(PPID:PID=__22527:::22528)]    here[___1(PPID:PID=__22527:::22528)]    here[___2(PPID:PID=__22527:::22528)]    here[___3(PPID:PID=__22527:::22528)]    here[___4(PPID:PID=__22527:::22528)]    here[___5(PPID:PID=__22527:::22528)]    here[___6(PPID:PID=__22527:::22528)]    here[___7(PPID:PID=__22527:::22528)]    here[___8(PPID:PID=__22527:::22528)]    here[___9(PPID:PID=__22527:::22528)]    here[__10(PPID:PID=__22527:::22528)]    here[__11(PPID:PID=__22527:::22528)]    here[__12(PPID:PID=__22527:::22528)]    here[__13(PPID:PID=__22527:::22528)]    here[__14(PPID:PID=__22527:::22528)]    here[__15(PPID:PID=__22527:::22528)]    here[__16(PPID:PID=__22527:::22528)]    
_____________________________Exit from ['multiprocessing']-backend

 [[0. 0. 0.]
  [0. 0. 0.]
  ...
  ]

还要在你的操作系统上检查这个这个,resp。您的实际joblib和(隐藏)酸洗-SER/DES-工具版本。

于 2020-04-30T04:03:25.123 回答