5

我有以下从 Castra 创建的 dask 数据框:

import dask.dataframe as dd

df = dd.from_castra('data.castra', columns=['user_id','ts','text'])

产量:

                      user_id / ts                  / text
ts
2015-08-08 01:10:00   9235      2015-08-08 01:10:00   a
2015-08-08 02:20:00   2353      2015-08-08 02:20:00   b
2015-08-08 02:20:00   9235      2015-08-08 02:20:00   c
2015-08-08 04:10:00   9235      2015-08-08 04:10:00   d
2015-08-08 08:10:00   2353      2015-08-08 08:10:00   e

我想做的是:

  1. user_id按和分组ts
  2. 在 3 小时内重新采样
  3. 在重采样步骤中,任何合并的行都应该连接文本

示例输出:

                                text
user_id   ts
9235      2015-08-08 00:00:00   ac
          2015-08-08 03:00:00   d
2353      2015-08-08 00:00:00   b
          2015-08-08 06:00:00   e

我尝试了以下方法:

df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()

并得到以下错误:

TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex

我尝试传入set_index('ts')管道,但它似乎不是Series.

关于如何实现这一目标的任何想法?

TL;博士

如果它使问题变得更容易,我还可以更改我创建的 Castra DB 的格式。我目前的实现很大程度上取自这篇很棒的帖子。

我将索引(在to_df()函数中)设置如下:

df.set_index('ts',drop=False,inplace=True)

并且有:

  with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
     batches = partition_all(batch_size, f)
     df, frames = peek(map(self.to_df, batches))
     castra = Castra(S.CASTRA, template=df, categories=categories)
     castra.extend_sequence(frames, freq='3h')

以下是生成的 dtypes:

ts                datetime64[ns]
text                      object
user_id                  float64
4

2 回答 2

7

如果我们可以假设每个user-id组都可以放入内存中,那么我建议使用 dask.dataframe 来执行外部组比,然后使用 pandas 来执行每个组内的操作,如下所示。

def per_group(blk):
    return blk.groupby('ts').text.resample('3H', how='sum')

df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()

这将两个困难的事情解耦到两个不同的项目中

  1. dask.dataframe 处理将所有用户 id 混洗到正确的组中
  2. 在每个组中进行复杂的日期时间重采样由 pandas 明确处理。

理想情况下,dask.dataframe 会自动为您编写每组函数。目前 dask.dataframe 不能智能地处理多索引,或者在多列 groupbys 之上重新采样,因此自动解决方案尚不可用。尽管如此,在仍然使用 dask.dataframe 相应地准备组的同时,很有可能回退到 pandas 进行每块计算。

于 2015-11-26T22:07:22.263 回答
-1

尝试将您的索引转换为 DatetimeIndex ,如下所示:

import datetime
# ...
df.index = dd.DatetimeIndex(df.index.map(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S')))
# ...
于 2015-11-26T18:55:47.990 回答