我有几个指标(每秒 2000 行 x 100 列数据帧,可能更大),我想将它们存储在 OpenTSDB 中。为此,我需要以数据库可以理解的方式格式化值,无论是telnet 样式还是json 样式。
问题是,对于一个天真的 python 函数,我无法足够快地处理它们。这是我的第一种方法:
def ndarray_to_opentsdb_series_comprehension(frame, table, **tags):
series = [{
"metric": '{table}.{col}'.format(
table=table, col=frame.columns[col].item()
),
"timestamp": frame.index[idx].item(),
"value": val.item(),
"tags": tags
} for col, serie in frame.iteritems() for idx, val in serie.iteritems()]
return json.dumps(series)
timeit
在 2000x100 数据帧上使用,我得到:
In [1]: %timeit utilities.ndarray_to_opentsdb_series_comprehension(f, 't1', p1='p1')
1 loops, best of 3: 3.9 s per loop
然后我尝试使用该DataFrame.apply
函数更有效地迭代我的数据,但我必须这样做几次才能获得我需要的所有信息:
def ndarray_to_opentsdb_series_tmp_df(frame, table, **tags):
tags_str = ' '.join('{k}={v}'.format(k=k, v=v) for k, v in tags.items())
df = frame.apply(lambda s: s.apply(lambda e: '{ts} {v} {tags}'.format(ts=s.name, v=e, tags=tags_str)), axis=1)
df = df.apply(lambda s: s.apply(lambda e: '{table}.{col} {v}'.format(table=table, col=s.name, v=e)), axis=0)
flat = [e for l in df.values.tolist() for e in l]
return '\n'.join(flat)
(我尝试了其他没有创建多个数据帧的实现,但它大致和这个一样快)。
在这里,timeit
说:
In[1]: %timeit utilities.ndarray_to_opentsdb_series_tmp_df(f, 't1', p1='p1')
1 loops, best of 3: 2.59 s per loop
我已经获得了超过一秒钟的时间,但这还不够,我需要能够在一秒钟内处理这么多数据。在我的测试中,我意识到最耗时的是在我的 DataFrame 中检索给定值的索引列对,但我需要这些来构建我的 OpenTSDB 请求。
有没有办法只使用 python 来处理大数据帧,或者我应该尝试在 Cython 中实现这个逻辑?我知道我可以获得巨大的改进,但我想在尝试使用较低级别的语言进行优化之前确保我拥有最佳的 Python 代码。