6

我正在寻找Redis连接丢失后[在多线程环境中]恢复的参考实现。到目前为止找不到任何有意义的东西。

设置:我有一个 Azure 辅助角色,它在多个线程中运行相同的代码 (ThreadProc)。最初,我有静态 ConnectionMultiplexer 并在每次 Redis 操作之前执行 .GetDatabase() 。那根本没有通过压力测试(一旦负载从低增加到中等,就会出现大量“无法连接”的错误)。我已将其更改为:

static readonly ConnectionMultiplexer _connection = ConnectionMultiplexer.Connect(...);
static readonly IDatabase _cache = _connection.GetDatabase();

void ThreadProc() // running in multiple threads
{
    while (true)
    {
      // using _cache here
    }
}

即使在高负载下(每个工作角色实例 1000+ 操作/秒),它也能很好地工作,直到我得到“没有可用的连接来服务这个操作”,然后事情就无法恢复。

请让我知道可以从间歇性连接问题中恢复的正确/推荐代码是什么。

4

2 回答 2

10

编辑 2015-05-02:虽然 StackExchange.Redis 客户端的更高版本明确应该在内部和自动处理这种“丢失的连接”->“重新连接”逻辑,但我的测试已经毫无疑问地表明他们没有不能成功地完成它,因此仍然需要这种东西,至少在繁忙的环境中是这样。我将下面的代码从我的缓存层中抽出一段时间,结果出现了数以万计的连接失败错误。我把它放回去,那些都消失了。

编辑 2015-02-24:不再需要此方法。最新版本的StackExchange.Redis客户端正确处理断开连接 - 他们会自动重新连接,下面的解决方法只会干扰事情。出于历史目的将其保留在这里,但我的建议是忽略它。


以下是我的包装器中的一些方法,SimpleCacheRedis<T>它们显示了我如何处理问题:

public async Task<TValue> GetAsync(string key, Func<Task<TValue>> missingFunc)
{
    key = GetKey(key);
    var value = default(TValue);
    try
    {
        var db = _connection.GetDatabase();
        var str = await db.StringGetAsync(key);
        if (!str.IsNullOrEmpty)
        {
            value = _jsonSerializer.Deserialize<TValue>(str);
        }
    }
    catch (RedisConnectionException ex)
    {
        HandleRedisConnectionError(ex);
    }
    catch (Exception ex)
    {
        _logger.Error("Error retrieving item '" + key +
                      "' from Redis cache; falling back to missingFunc(). Error = " + ex);
    }
    if (value == default(TValue))
    {
        present = false;
        value = await missingFunc();
        await PerformAddAsync(key, value);
    }
    return value;
}

private void HandleRedisConnectionError(RedisConnectionException ex)
{
    _logger.Error("Connection error with Redis cache; recreating connection for the next try, and falling back to missingFunc() for this one. Error = " + ex.Message);
    Task.Run(async () =>
    {
        try
        {
            await CreateConnectionAsync();
        }
        catch (Exception genEx)
        {
            _logger.Error("Unable to recreate redis connection (sigh); bailing for now: " + genEx.Message);
        }
    });
}

private async Task CreateConnectionAsync()
{
    if (_attemptingToConnect) return;
    var sw = new StringWriter();
    try
    {
        _attemptingToConnect = true;
        _connection = await ConnectionMultiplexer.ConnectAsync(_redisCs, sw);
    }
    catch (Exception ex)
    {
        _logger.Error("Unable to connect to redis async: " + ex);
        _logger.Debug("internal log: \r\n" + sw);
        throw;
    }
    finally
    {
        _attemptingToConnect = false;
    }
}

基本上,如果我发现由于 a 无法连接到 Redis RedisConnectionException,我会分拆一个单独的async任务来重新创建共享连接。当然,那个调用可能会失败,但无论如何,在那段时间调用都会失败。一旦成功,任何新调用都将使用该新(重新)创建的连接。就像我上面说的,有点无聊。

我的情况可能与您的情况有些不同,因为我没有将 Redis 用作永久存储,而只是用作缓存。这意味着丢失 redis 连接的唯一影响是我需要从数据库而不是缓存中检索结果。所以我可以稍微松散地对待某些事情。

于 2014-08-05T19:21:51.697 回答
2

好吧,如果没有人愿意,我想我会回答我自己的问题,尽管这看起来很奇怪,因为它是一个如此基本的用例。

这是管理连接丢失的类:

static class RedisConnectionManager
{
    private static readonly Dictionary<string, IDatabase> _dictionary = new Dictionary<string, IDatabase>();

    internal static IDatabase GetDatabase(string connectionString)
    {
        lock (_dictionary)
        {
            if (!_dictionary.ContainsKey(connectionString))
                _dictionary.Add(connectionString, ConnectionMultiplexer.Connect(connectionString).GetDatabase());
            if (!_dictionary[connectionString].Multiplexer.IsConnected)
            {
                _dictionary[connectionString].Multiplexer.Dispose();
                _dictionary[connectionString] = ConnectionMultiplexer.Connect(connectionString).GetDatabase();
            }
            return _dictionary[connectionString];
        }
    }
}

这个类处理多个连接字符串,所以如果你只有一个,代码会更简单。请注意显式Multiplexer.Dispose()调用。由于底层对象拥有物理 TCP 连接,因此您不能等到 GC 启动才释放资源。到那时,根据您的负载,您可能会有数千个孤立的 TCP 连接。

这段代码运行得相当好,但我仍然不能 100% 确定这是处理这个问题的最佳方法。如果有人知道如何改进这一点,请告诉我。

于 2014-11-07T21:37:55.107 回答