0

After my previous thread was marked as a duplicate, it pointed me in the direction of multiprocessing managers. I'm trying to use multiprocessing to create a service that handles my pandas dataframe to give to Flask requests. This is my code so far:

df_manager.py

from multiprocessing.managers import BaseManager
import pandas as pd

def init_dataframe():
    return pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})

def get_df():
    return df

df = init_dataframe()
manager = BaseManager(('', 37844), b'password')
manager.register('get_df', get_df)
server = manager.get_server()
server.serve_forever()

data_handler.py

from multiprocessing.managers import BaseManager
import pandas as pd

def get_df():
    manager = BaseManager(('', 37844), b'password')
    manager.register('get_df')
    manager.connect()
    return manager.get_df()

def data():
    df = get_df()
    return df.to_dict()

if __name__ == '__main__':
    data()

Unfortunately, this throws an exception when attempting to call manager.get_df() in data_handler.py.

Traceback (most recent call last):
  File "src/data_handler.py", line 15, in <module>
    data()
  File "src/data_handler.py", line 11, in data
    df = get_df()
  File "src/data_handler.py", line 8, in get_df
    return manager.get_df()
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 724, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 609, in _create
    id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 82, in dispatch
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 201, in handle_request
    result = func(c, *args, **kwds)
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 391, in create
    exposed = public_methods(obj)
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 122, in public_methods
    return [name for name in all_methods(obj) if name[0] != '_']
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 113, in all_methods
    func = getattr(obj, name)
  File "/home/admin/dev/pandas-multiprocessing/venv/lib/python3.7/site-packages/pandas/core/frame.py", line 392, in _constructor_expanddim
    raise NotImplementedError("Not supported for DataFrames!")
NotImplementedError: Not supported for DataFrames!
---------------------------------------------------------------------------

Any help in the right direction would be much appreciated!

EDIT: This seems to be caused by DataFrames specifically, as returning df.to_json() instead of just df in df_manager.py seems to work fine. Still investigating...

EDIT2: I have updated the code to remove the Flask dependency, as it has seemingly nothing to do with it.

Git repo

4

1 回答 1

0

This issue is fixed by exposing the relevant methods to the proxy used by BaseManager. This can be done in the register call in data_handler.py.

df_manager.py

from multiprocessing.managers import BaseManager
import pandas as pd

def init_dataframe():
    return pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})

def get_df():
    return df

df = init_dataframe()
manager = BaseManager(('', 37844), b'password')
manager.register('get_df', callable=get_df, exposed='get_df') # Adding `exposed` parameter was the key to solving the issue
server = manager.get_server()
server.serve_forever()

data_handler.py

from multiprocessing.managers import BaseManager
import pandas as pd

def get_df():
    manager = BaseManager(('', 37844), b'password')
    manager.register('get_df')
    manager.connect()
    return manager.get_df()

def data():
    df = get_df()
    return df

if __name__ == '__main__':
    print(data())
于 2019-12-22T23:53:49.270 回答