将Redis Stream输出(aioredis 客户端/hiredis 解析器)转换为 Pandas Dataframe的最快方法是什么,其中 Redis Stream ID 的时间戳和序列号以及值是正确类型转换的 Pandas 索引列?
示例 Redis 输出:
[[b'1554900384437-0', [b'key', b'1']],
[b'1554900414434-0', [b'key', b'1']]]
将Redis Stream输出(aioredis 客户端/hiredis 解析器)转换为 Pandas Dataframe的最快方法是什么,其中 Redis Stream ID 的时间戳和序列号以及值是正确类型转换的 Pandas 索引列?
示例 Redis 输出:
[[b'1554900384437-0', [b'key', b'1']],
[b'1554900414434-0', [b'key', b'1']]]
这里似乎有两个主要瓶颈:
Pandas DataFrames 以列优先格式存储它们的数据,这意味着每一列映射到一个 numpy 数组,而 Redis 流数据是逐行的。
Pandas MultiIndex 是为分类数据制作的,将原始数组转换为所需的级别/代码结构似乎没有优化
由于数字 1,循环遍历所有 Redis 流条目是不可避免的。假设我们事先知道长度,我们可以预先分配我们填充的 numpy 数组,并通过一些技巧将这些数组重用为 DataFrame 列。如果在 Python 中循环的开销仍然太大,那么在 Cython 中重写应该很简单。
由于您没有指定数据类型,因此答案使用 numpy.object 数组将所有内容保存在字节中,因此如何适应自定义设置应该是相当明显的。将所有列放在同一个数组中的唯一原因是将列/字段上的内部循环从 Python 移动到 C。它可以拆分为例如每种数据类型一个数组或每列一个数组。
from functools import partial, reduce
import numpy as np
import pandas as pd
data = [[b'1554900384437-0', [b'foo', b'1', b'bar', b'2', b'bla', b'abc']],
[b'1554900414434-0', [b'foo', b'3', b'bar', b'4', b'bla', b'xyz']]]
colnames = data[0][1][0::2]
ncols = len(colnames)
nrows = len(data)
ts_seq = np.empty((2, nrows), dtype=np.int64)
cols = np.empty((ncols, nrows), dtype=np.object)
for i,(id,fields) in enumerate(data):
ts, seq = id.split(b"-", 2)
ts_seq[:, i] = (int(ts), int(seq))
cols[:, i] = fields[1::2]
colframes = [pd.DataFrame(cols[i:i+1, :].T) for i in range(ncols)]
merge = partial(pd.merge, left_index=True, right_index=True, copy=False)
df = reduce(merge, colframes[1:], colframes[0])
df.columns = colnames
对于数字 2,我们可以使用numpy.unique
创建 Pandas MultiIndex 所需的级别/代码结构。从文档看来,它numpy.unique
也对数据进行了排序。由于我们的数据可能已经排序,未来可能的优化是尝试跳过排序步骤。
ts = ts_seq[0, :]
seq = ts_seq[1, :]
maxseq = np.max(seq)
ts_levels, ts_codes = np.unique(ts, return_inverse=True)
seq_levels = np.arange(maxseq+1)
seq_codes = seq
df.index = pd.MultiIndex(levels=[ts_levels, seq_levels], codes=[ts_codes, seq_codes], names=["Timestamp", "Seq"])
最后,我们可以通过做来验证没有涉及复制
cols[0, 0] = b'79'
并检查中的条目df
确实发生了变化。
最快的方法是使用批处理数据
以 N 条消息为批次的 IO(即每批次 100 条消息)
将此批次转换为 1 个数据帧(使用 pd.DataFrame([]))
将 lambda 或转换函数应用于转换为 numpy (.values) 的时间戳列。翼:
df['time'] = [datetime.fromtimestamp(t.split('-')[0]) for t in df['time'].values]
你可以使用这个:
pd.read_msgpack(redisConn.get("key"))