1

我正在进行批量数据处理,为了速度,我首先将数据存储在 redis 中,然后每隔 2 分钟将其转储到 postgresql 数据库中。我使用 redis 散列来存储数据,甚至 redis 中的散列键对应于数据库表中的列。

redis.scan()用来获取存储数据行的redis哈希列表,然后redis.hgetall()用来获取哈希中的数据。从那里我在 SqlAlchemy 中创建一个 SQL Insert 语句,并将批量数据插入到数据库中。

我面临的问题是我必须首先使用 SCAN 提取包含我的数据的密钥:

redis_match = 'data:row_keys:*'
row_keys = rdb.scan_iter(match=redis_match, count=limit_no)

从那里我获取每个哈希中的所有数据:

for index, row_id in enumerate(row_keys):
    row_data = rdb.hgetall(row_id)

row_data 的形式是,key:value但它以形式存储,byte因此会产生额外的开销来手动解码每个键和值:

for key, value in row_data.items():
  key = ast.literal_eval(key.decode('unicode_escape'))
  value = ast.literal_eval(value.decode('unicode_escape'))

我觉得这太过分了,必须有一种更优雅的方式来:

  1. 使用 hgetall() 从 redis 获取数据并能够立即将该数据用于批量 SQL 插入,因为 redis 哈希中的键对应于 postgresql 表中的列名
  2. 即使 1 是不可能的,至少必须有一种更快的方法来使用 hgetall() 从 redis 获取数据,并对整个条目进行一些即时解码,即哈希中的所有条目,而不是迭代每个键和值

编辑:

我已经阅读了关于 postgresql 的 Foreign Data Wrappers 尤其是redis_fdw的信息,我想知道它是否可以解决我的问题,即以尽可能少的麻烦更快地将数据从 Redis 移动到 Postgresql

4

1 回答 1

1

redis_fdw是要走的路。请记住,散列集的每个成员在相应的 Pg 外部表中不会是不同的行。相反,它将在外部表中为每个 Redis 哈希创建一行,并为所有哈希值使用 Pg 数组。

例如,对于 Redis 中的以下哈希:

myhash = {a:1, b:2}

您可以创建外部表:

CREATE FOREIGN TABLE my_pg_hash (key text, val text[])
SERVER redis_server
OPTIONS (database '0', tabletype 'hash', tablekeyprefix 'myhash');

外部表my_pg_hash将包含整个 Redis 哈希集的单行myhash。该行将具有作为键myhash和值的postgres 数组,其中包含您的 redis 哈希的所有键/值对。

SELECT * FROM my_pg_hash;

 key      |    val    
----------+-----------
 myhash   | {a,1,b,2}
(1 row)

您可以使用 Pg 的unnest()函数将 val 数组拆分为单独的行:

SELECT key, unnest(val) FROM my_pg_hash;

  key   | unnest 
--------+--------
 myhash | a
 myhash | 1
 myhash | b
 myhash | 2
(4 rows)
于 2016-01-06T09:56:48.190 回答