0

几乎相同的时间(与 redigo 错误日志的时间点:write: connection reset by peer?),redis 错误日志:
Client id=45183 addr=127.0.0.1:40420 fd=39 name= age=39706 idle=46 flags=N db=0 sub=8 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=16114 oll=528 omem=8545237 events=rw cmd=ping scheduled to be closed ASAP for overcoming of output buffer limits.

去错误日志

write tcp 127.0.0.1:40806->127.0.0.1:6379: write: connection reset by peer

在此之前,Go 程序大约有 7 分钟没有收到订阅消息。我认为这是由于未使用消息引起的缓存溢出。

Redis 客户端输出缓冲区限制是默认配置。linux fd和连接数都正常,找不到原因。

这是我的代码:

服务器.go

func WaitFroMsg(ctx context.Context, pool *redis.Pool, onMessage func(channel string, data []byte) error, channel ...string) (err error) {
    conn := pool.Get()
    psc := redis.PubSubConn{Conn: conn}
    if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
        return err
    }
    done := make(chan error, 1)
    go func() {
        for {
            switch n := psc.Receive().(type) {
            case error:
                done <- fmt.Errorf("redis pubsub receive err: %v", n)
                return
            case redis.Message:
                if err = onMessage(n.Channel, n.Data); err != nil {
                    done <- err
                    return
                }
            case redis.Subscription:
                if n.Count == 0 {
                    fmt.Println("all channels are unsubscribed", channel)
                    done <- nil
                    return
                }
            }
        }
    }()
    const healthCheck = time.Minute
    ticker := time.NewTicker(healthCheck) 
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            if err = psc.Ping(""); err != nil {
                fmt.Println("healthCheck ", err, channel)
                return err
            }
        case err := <-done:
            return err
        case <-ctx.Done(): 
            if err := psc.Unsubscribe(); err != nil {
                return fmt.Errorf("redis unsubscribe failed: %v", err)
            }
            return nil
        }
    }
}

池.go

func NewPool(addr string, db int) *redis.Pool {
    return &redis.Pool{
        MaxIdle:     3,
        IdleTimeout: 240 * time.Second,
        Dial: func() (redis.Conn, error) {
            c, err := redis.Dial("tcp", addr)
            if err != nil {
                return nil, err
            }
            if _, err = c.Do("SELECT", db); err != nil {
                c.Close()
                return nil, err
            }
            return c, nil
        },
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            if time.Since(t) < time.Minute {
                return nil
            }
            _, err := c.Do("PING")
            fmt.Println("PING error", err)
            return err
        },
    }
}
4

0 回答 0