66

使用 pandas 处理实时传入数据的最推荐/pythonic 方式是什么?

每隔几秒钟,我就会收到以下格式的数据点:

{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

我想将它附加到现有的 DataFrame 中,然后对其进行一些分析。

问题是,仅使用 DataFrame.append 附加行可能会导致所有复制的性能问题。

我尝试过的事情:

一些人建议预先分配一个大 DataFrame 并在数据进入时对其进行更新:

In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)

In [2]: columns = ['high', 'low', 'open', 'close']

In [3]: df = pd.DataFrame(index=t, columns=columns)

In [4]: df
Out[4]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02  NaN  NaN  NaN   NaN
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

In [6]: data_ = pd.Series(data)

In [7]: df.loc[data['time']] = data_

In [8]: df
Out[8]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02    4    3    2     1
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

另一种选择是建立一个字典列表。只需将传入的数据附加到列表中,然后将其分割成更小的 DataFrame 即可完成工作。

In [9]: ls = []

In [10]: for n in range(5):
   .....:     # Naive stuff ahead =)
   .....:     time = '2013-01-01 00:00:0' + str(n)
   .....:     d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
   .....:     ls.append(d)

In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')

In [12]: df
Out[12]: 
                        close      high       low      open stock
time                                                             
2013-01-01 00:00:01  3.270078  1.008289  7.486118  2.180683  BLAH
2013-01-01 00:00:02  3.883586  2.215645  0.051799  2.310823  BLAH

或类似的东西,也许更多地处理输入。

4

2 回答 2

25

我会使用 HDF5/pytables 如下:

  1. 将数据“尽可能长”地保存为 python 列表。
  2. 将您的结果附加到该列表中。
  3. 当它变得“大”时:
    • 使用 pandas io(和一个可附加的表)推送到 HDF5 Store。
    • 清除列表。
  4. 重复。

事实上,我定义的函数为每个“键”使用了一个列表,以便您可以在同一进程中将多个 DataFrame 存储到 HDF5 Store。


我们定义了一个函数,您可以在每一行中调用它d

CACHE = {}
STORE = 'store.h5'   # Note: another option is to keep the actual file open

def process_row(d, key, max_len=5000, _cache=CACHE):
    """
    Append row d to the store 'key'.

    When the number of items in the key's cache reaches max_len,
    append the list of rows to the HDF5 store and clear the list.

    """
    # keep the rows for each key separate.
    lst = _cache.setdefault(key, [])
    if len(lst) >= max_len:
        store_and_clear(lst, key)
    lst.append(d)

def store_and_clear(lst, key):
    """
    Convert key's cache list to a DataFrame and append that to HDF5.
    """
    df = pd.DataFrame(lst)
    with pd.HDFStore(STORE) as store:
        store.append(key, df)
    lst.clear()

注意:我们使用 with 语句在每次写入后自动关闭存储。保持打开可能会更快,但如果是这样,建议您定期冲洗(关闭冲洗)另请注意,使用集合双端队列而不是列表可能更具可读性,但此处列表的性能会稍好一些。

要使用它,您可以调用:

process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
            key="df")

注意:“df”是pytables 存储中使用的存储密钥。

作业完成后,确保您store_and_clear有剩余的缓存:

for k, lst in CACHE.items():  # you can instead use .iteritems() in python 2
    store_and_clear(lst, k)

现在您可以通过以下方式获得完整的 DataFrame:

with pd.HDFStore(STORE) as store:
    df = store["df"]                    # other keys will be store[key]

一些评论:

  • 5000 可以调整,尝试使用一些更小/更大的数字来满足您的需求。
  • 列表追加是 O(1),数据帧追加是 O( len(df))。
  • 在您进行统计或数据处理之前,您不需要 pandas,请使用最快的。
  • 此代码适用于传入的多个键(数据点)。
  • 这是非常少的代码,我们停留在 vanilla python list 和 pandas dataframe 中......

此外,为了获得最新的读数,您可以定义一个 get 方法,该方法在读取之前存储和清除。通过这种方式,您将获得最新的数据:

def get_latest(key, _cache=CACHE):
    store_and_clear(_cache[key], key)
    with pd.HDFStore(STORE) as store:
        return store[key]

现在,当您访问时:

df = get_latest("df")

您将获得可用的最新“df”。


另一个选项稍微复杂一些:在 vanilla pytables 中定义一个自定义表,请参阅教程

注意:您需要知道字段名称才能创建列描述符

于 2015-12-15T06:22:12.697 回答
10

您实际上是在尝试解决两个问题:捕获实时数据并分析该数据。第一个问题可以通过专门为此目的而设计的Python logging来解决。然后可以通过读取相同的日志文件来解决另一个问题。

于 2013-06-12T00:40:02.933 回答