3

我有一个由时间序列组成的数据框。

日期索引 | 时间序列 1 | 时间序列 2 | ... 等等

我使用 pyRserve 使用 R 运行预测功能。

我想使用 celery 实现并行处理。我在以下上下文中编写了工作代码。

def pipeR(k #input variable):
    conn = pyRserve.connect(host = 'localhost', port = 6311)
    # OPENING THE CONNECTION TO R

    conn.r.i = k
    # ASSIGNING THE PYTHON VARIABLE TO THAT OF IN THE R ENVIRONMENT

    conn.voideval\('''
    WKR_Func <- forecst(a)
    {
    ...# FORECASTS THE TIMESERIES IN COLUMN a OF THE DATAFRAME
    }
    ''')

    conn.eval('forecst(i)')
    # CALLING THE FUNCTION IN R

group(pipeR.s(k) for k in [...list of column headers...])()

为了实现并行处理,我可以为所有工作进程使用一个端口(就像我在上面的代码中所做的那样,端口:6311)还是应该为不同的工作进程提供不同的端口?

我目前遇到错误

socketConnection 中的错误(“localhost”,port=port,server=TRUE,blocking=TRUE,:无法打开连接

在 R。

4

1 回答 1

1

当我为每个工作进程打开不同的端口时,问题得到了解决......

def pipeR( k, Frequency, Horizon, Split, wd_path):
    # GENERATING A RANDOM PORT
    port = randint(1000,9999)

    # OPENING THE PORT IN THE R ENVIRONMENT
    conn0 = pyRserve.connect(host = 'localhost', port = 6311)
    conn0.r.port = port
    conn0.voidEval\
    ('''
        library(Rserve)
        Rserve(port = port, args = '--no-save')
     ''')

    # OPENING THE PORT IN THE PYTHON ENVIRONMENT
    conn = pyRserve.connect(host = 'localhost', port = port)

    # ASSIGNING THE PYTHON VARIABLE TO THAT OF IN THE R ENVIRONMENT
    conn.r.i = k

    conn.voideval\
    ('''
     WKR_Func <- forecst(a)
     {
     ...# FORECASTS THE TIMESERIES IN COLUMN a OF THE DATAFRAME
     }
     ''')

    conn.eval/('forecst(i)')
    conn0.close()
于 2015-12-24T05:14:23.200 回答