我有以下从 Castra 创建的 dask 数据框:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
产量:
user_id / ts / text
ts
2015-08-08 01:10:00 9235 2015-08-08 01:10:00 a
2015-08-08 02:20:00 2353 2015-08-08 02:20:00 b
2015-08-08 02:20:00 9235 2015-08-08 02:20:00 c
2015-08-08 04:10:00 9235 2015-08-08 04:10:00 d
2015-08-08 08:10:00 2353 2015-08-08 08:10:00 e
我想做的是:
user_id
按和分组ts
- 在 3 小时内重新采样
- 在重采样步骤中,任何合并的行都应该连接文本
示例输出:
text
user_id ts
9235 2015-08-08 00:00:00 ac
2015-08-08 03:00:00 d
2353 2015-08-08 00:00:00 b
2015-08-08 06:00:00 e
我尝试了以下方法:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()
并得到以下错误:
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex
我尝试传入set_index('ts')
管道,但它似乎不是Series
.
关于如何实现这一目标的任何想法?
TL;博士
如果它使问题变得更容易,我还可以更改我创建的 Castra DB 的格式。我目前的实现很大程度上取自这篇很棒的帖子。
我将索引(在to_df()
函数中)设置如下:
df.set_index('ts',drop=False,inplace=True)
并且有:
with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
batches = partition_all(batch_size, f)
df, frames = peek(map(self.to_df, batches))
castra = Castra(S.CASTRA, template=df, categories=categories)
castra.extend_sequence(frames, freq='3h')
以下是生成的 dtypes:
ts datetime64[ns]
text object
user_id float64