1

TL;DR:我们在将 Pandas 代码与从同一个 HDF 读取和写入的 Dask 并行化时遇到问题

我正在从事一个通常需要三个步骤的项目:读取、翻译(或组合数据)和写入这些数据。就上下文而言,我们正在处理医疗记录,我们收到不同格式的索赔,将它们转换为标准化格式,然后将它们重新写入磁盘。理想情况下,我希望以某种形式保存中间数据集,以便以后可以通过 Python/Pandas 访问。

目前,我选择 HDF 作为我的数据存储格式,但是我遇到了运行时问题。在大量人口中,我的代码目前可能需要几天时间。这导致我调查 Dask,但我不确定我是否已将 Dask 最好地应用于我的情况。

下面是我的工作流程的一个工作示例,希望有足够的示例数据来了解运行时问题。

读取(在本例中为创建)数据

import pandas as pd
import numpy as np
import dask
from dask import delayed
from dask import dataframe as dd
import random
from datetime import timedelta
from pandas.io.pytables import HDFStore

member_id = range(1, 10000)
window_start_date = pd.to_datetime('2015-01-01')
start_date_col = [window_start_date + timedelta(days=random.randint(0, 730)) for i in member_id]

# Eligibility records
eligibility = pd.DataFrame({'member_id': member_id,
                            'start_date': start_date_col})
eligibility['end_date'] = eligibility['start_date'] + timedelta(days=365)
eligibility['insurance_type'] = np.random.choice(['HMO', 'PPO'], len(member_id), p=[0.4, 0.6])
eligibility['gender'] = np.random.choice(['F', 'M'], len(member_id), p=[0.6, 0.4])
(eligibility.set_index('member_id')
 .to_hdf('test_data.h5',
         key='eligibility',
         format='table'))

# Inpatient records
inpatient_record_number = range(1, 20000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in inpatient_record_number]
inpatient = pd.DataFrame({'inpatient_record_number': inpatient_record_number,
                          'service_date': service_date})
inpatient['member_id'] = np.random.choice(list(range(1, 10000)), len(inpatient_record_number))
inpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(inpatient_record_number))
(inpatient.set_index('member_id')
 .to_hdf('test_data.h5',
         key='inpatient',
         format='table'))

# Outpatient records
outpatient_record_number = range(1, 30000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in outpatient_record_number]
outpatient = pd.DataFrame({'outpatient_record_number': outpatient_record_number,
                           'service_date': service_date})
outpatient['member_id'] = np.random.choice(range(1, 10000), len(outpatient_record_number))
outpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(outpatient_record_number))
(outpatient.set_index('member_id')
 .to_hdf('test_data.h5',
         key='outpatient',
         format='table'))

翻译/写入数据

顺序方法

def pull_member_data(member_i):
    inpatient_slice = pd.read_hdf('test_data.h5', 'inpatient', where='index == "{}"'.format(member_i))
    outpatient_slice = pd.read_hdf('test_data.h5', 'outpatient', where='index == "{}"'.format(member_i))
    return inpatient_slice, outpatient_slice


def create_visits(inpatient_slice, outpatient_slice):
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER into medical 'visits'
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
    visits_stacked = pd.concat([inpatient_slice, outpatient_slice]).reset_index().sort_values('service_date')
    visits_stacked.insert(0, 'visit_id', range(1, len(visits_stacked) + 1))
    return visits_stacked


def save_visits_to_hdf(visits_slice):
    with HDFStore('test_data.h5', mode='a') as store:
        store.append('visits', visits_slice)


# Read in the data by member_id, perform some operation
def translate_by_member(member_i):
    inpatient_slice, outpatient_slice = pull_member_data(member_i)
    visits_slice = create_visits(inpatient_slice, outpatient_slice)
    save_visits_to_hdf(visits_slice)


def run_translate_sequential():
    # Simple approach: Loop through each member sequentially
    for member_i in member_id:
        translate_by_member(member_i)

run_translate_sequential()

上面的代码在我的机器上运行大约需要 9 分钟。

达斯克方法

def create_visits_dask_version(visits_stacked):
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
    len_of_visits = visits_stacked.shape[0]
    visits_stacked_1 = (visits_stacked
                        .sort_values('service_date')
                        .assign(visit_id=range(1, len_of_visits + 1))
                        .set_index('visit_id')
                        )
    return visits_stacked_1


def run_translate_dask():
    # Approach 2: Dask, with individual writes to HDF
    inpatient_dask = dd.read_hdf('test_data.h5', 'inpatient')
    outpatient_dask = dd.read_hdf('test_data.h5', 'outpatient')
    stacked = dd.concat([inpatient_dask, outpatient_dask])
    visits = stacked.groupby('member_id').apply(create_visits_dask_version)
    visits.to_hdf('test_data_dask.h5', 'visits')

run_translate_dask()

这种 Dask 方法需要 13 秒(!)

虽然这是一个很大的改进,但我们通常对以下几点感到好奇:

  1. 鉴于这个简单的例子,使用 Dask 数据帧、连接它们并使用 groupby/apply 的方法是最好的方法吗?

  2. 实际上,我们有多个这样的进程,它们从同一个 HDF 读取,并写入同一个 HDF。我们最初的代码库的结构允许一次运行整个工作流程member_id。当我们尝试并行化它们时,它有时会在小样本上工作,但大多数时候会产生分段错误。像这样的并行化工作流程,使用 HDF 读/写是否存在已知问题?我们也在努力制作一个这样的例子,但我们想我们会在这里发布这个,以防触发建议(或者如果这个代码可以帮助面临类似问题的人)。

任何和所有反馈表示赞赏!

4

1 回答 1

1

一般来说 groupby-apply 会相当慢。使用这样的数据通常具有挑战性,尤其是在内存有限的情况下。

一般来说,我建议使用 Parquet 格式(dask.dataframe 具有 to_ 和 read_parquet 函数)。与 HDF 文件相比,您遇到段错误的可能性要小得多。

于 2017-09-29T22:17:06.223 回答