42

我正在开发一个用于访问 Redis 服务器的 Python 服务(类)。我想知道如何检查 Redis 服务器是否正在运行。而且,如果不知何故我无法连接到它。

这是我的代码的一部分

import redis
rs = redis.Redis("localhost")
print rs

它打印以下内容

<redis.client.Redis object at 0x120ba50>

即使我的 Redis 服务器没有运行。

因为我发现我的 Python 代码只有在我使用我的 redis 实例执行set()get()时才会连接到服务器。

所以我不希望使用我的班级的其他服务得到一个例外说

redis.exceptions.ConnectionError: Error 111 connecting localhost:6379. Connection refused.

我想返回正确的消息/错误代码。我怎样才能做到这一点??

4

7 回答 7

35

检查 redis 服务器可用性的官方方法是 ping ( http://redis.io/topics/quickstart )。

一种解决方案是将 redis 子类化并做两件事:

  1. 在实例化时检查连接
  2. 发出请求时在没有连接的情况下编写异常处理程序
于 2012-10-19T06:38:54.383 回答
30

如果要在启动时测试一次 redis 连接,请使用该ping()命令。

from redis import Redis

redis_host = '127.0.0.1'
r = Redis(redis_host, socket_connect_timeout=1) # short timeout for the test

r.ping() 

print('connected to redis "{}"'.format(redis_host)) 

该命令ping()检查连接,如果无效将引发异常。

  • 注意 - 执行测试后连接可能仍会失败,因此这不会掩盖以后的超时异常。
于 2012-10-12T13:23:26.200 回答
18

正如您所说,仅当您尝试在服务器上执行命令时才建立与 Redis 服务器的连接。如果您不想在不检查服务器是否可用的情况下继续前进,您可以向服务器发送随机查询并检查响应。就像是 :

try:
    response = rs.client_list()
except redis.ConnectionError:
    #your error handlig code here    
于 2012-10-12T13:37:11.523 回答
14

这里已经有很好的解决方案,但这是我对 django_redis 的快速而肮脏的解决方案,它似乎不包含 ping 功能(尽管我使用的是旧版本的 django 并且不能使用最新的 django_redis)。

# assuming rs is your redis connection
def is_redis_available():
    # ... get redis connection here, or pass it in. up to you.
    try:
        rs.get(None)  # getting None returns None or throws an exception
    except (redis.exceptions.ConnectionError, 
            redis.exceptions.BusyLoadingError):
        return False
    return True

这似乎工作得很好。请注意,如果 redis 正在重新启动并且仍在加载保存磁盘上缓存条目的 .rdb 文件,那么它将抛出 BusyLoadingError,尽管它的基类是 ConnectionError,因此只需捕获它就可以了。

你也可以简单地 except on redis.exceptions.RedisErrorwhich 是所有 redis 异常的基类。

根据您的需要,另一种选择是创建获取和设置函数,以在设置/获取值时捕获 ConnectionError 异常。然后你可以继续或等待或任何你需要做的事情(引发一个新的异常或只是抛出一个更有用的错误消息)。

如果您绝对依赖于设置/获取缓存值(出于我的目的,如果缓存因我们通常必须“继续”而处于脱机状态),这可能无法正常工作,在这种情况下,有例外并让程序/脚本死亡并使redis服务器/服务恢复到可访问状态。

于 2015-07-23T20:41:52.950 回答
5

当 redis 没有运行时,我还从 sockets 库中遇到了一个ConnectionRefusedError,因此我不得不将它添加到可用性检查中。

r = redis.Redis(host='localhost',port=6379,db=0)

def is_redis_available(r):
    try:
        r.ping()
        print("Successfully connected to redis")
    except (redis.exceptions.ConnectionError, ConnectionRefusedError):
        print("Redis connection error!")
        return False
    return True

if is_redis_available(r):
    print("Yay!")
于 2020-09-15T15:02:10.963 回答
4

可以通过对服务器执行 ping 命令来检查 Redis 服务器连接。

>>> import redis
>>> r = redis.Redis(host="127.0.0.1", port="6379")
>>> r.ping()
True

使用该ping方法,我们可以处理重新连接等。为了了解连接错误的原因,可以按照其他答案中的建议使用异常处理。

try:
    is_connected = r.ping()
except redis.ConnectionError:
    # handle error
于 2021-07-21T17:00:18.800 回答
3

使用 ping()

from redis import Redis

conn_pool = Redis(redis_host)
# Connection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

try:
    conn_pool.ping()
    print('Successfully connected to redis')
except redis.exceptions.ConnectionError as r_con_error:
    print('Redis connection error')
    # handle exception
于 2019-12-12T21:08:48.250 回答