diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 13:26:22 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 15:44:45 +0300 |
commit | 0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch) | |
tree | 291d72dbd7e9865399f668c84d11ed86fb190bbf /vendor/github.com/go-redis/redis/v8/internal/pool | |
parent | cb2c8d75065e5b3c47094067cb4aa407d4813298 (diff) | |
download | ydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz |
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'vendor/github.com/go-redis/redis/v8/internal/pool')
5 files changed, 959 insertions, 0 deletions
diff --git a/vendor/github.com/go-redis/redis/v8/internal/pool/conn.go b/vendor/github.com/go-redis/redis/v8/internal/pool/conn.go new file mode 100644 index 0000000000..5661659865 --- /dev/null +++ b/vendor/github.com/go-redis/redis/v8/internal/pool/conn.go @@ -0,0 +1,121 @@ +package pool + +import ( + "bufio" + "context" + "net" + "sync/atomic" + "time" + + "github.com/go-redis/redis/v8/internal/proto" +) + +var noDeadline = time.Time{} + +type Conn struct { + usedAt int64 // atomic + netConn net.Conn + + rd *proto.Reader + bw *bufio.Writer + wr *proto.Writer + + Inited bool + pooled bool + createdAt time.Time +} + +func NewConn(netConn net.Conn) *Conn { + cn := &Conn{ + netConn: netConn, + createdAt: time.Now(), + } + cn.rd = proto.NewReader(netConn) + cn.bw = bufio.NewWriter(netConn) + cn.wr = proto.NewWriter(cn.bw) + cn.SetUsedAt(time.Now()) + return cn +} + +func (cn *Conn) UsedAt() time.Time { + unix := atomic.LoadInt64(&cn.usedAt) + return time.Unix(unix, 0) +} + +func (cn *Conn) SetUsedAt(tm time.Time) { + atomic.StoreInt64(&cn.usedAt, tm.Unix()) +} + +func (cn *Conn) SetNetConn(netConn net.Conn) { + cn.netConn = netConn + cn.rd.Reset(netConn) + cn.bw.Reset(netConn) +} + +func (cn *Conn) Write(b []byte) (int, error) { + return cn.netConn.Write(b) +} + +func (cn *Conn) RemoteAddr() net.Addr { + if cn.netConn != nil { + return cn.netConn.RemoteAddr() + } + return nil +} + +func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error { + if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { + return err + } + return fn(cn.rd) +} + +func (cn *Conn) WithWriter( + ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error, +) error { + if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil { + return err + } + + if cn.bw.Buffered() > 0 { + cn.bw.Reset(cn.netConn) + } + + if err := fn(cn.wr); err != nil { + return err + } + + return cn.bw.Flush() +} + +func (cn *Conn) Close() error { + return cn.netConn.Close() +} + +func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time { + tm := time.Now() + cn.SetUsedAt(tm) + + if timeout > 0 { + tm = tm.Add(timeout) + } + + if ctx != nil { + deadline, ok := ctx.Deadline() + if ok { + if timeout == 0 { + return deadline + } + if deadline.Before(tm) { + return deadline + } + return tm + } + } + + if timeout > 0 { + return tm + } + + return noDeadline +} diff --git a/vendor/github.com/go-redis/redis/v8/internal/pool/pool.go b/vendor/github.com/go-redis/redis/v8/internal/pool/pool.go new file mode 100644 index 0000000000..44a4e779df --- /dev/null +++ b/vendor/github.com/go-redis/redis/v8/internal/pool/pool.go @@ -0,0 +1,557 @@ +package pool + +import ( + "context" + "errors" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/go-redis/redis/v8/internal" +) + +var ( + // ErrClosed performs any operation on the closed client will return this error. + ErrClosed = errors.New("redis: client is closed") + + // ErrPoolTimeout timed out waiting to get a connection from the connection pool. + ErrPoolTimeout = errors.New("redis: connection pool timeout") +) + +var timers = sync.Pool{ + New: func() interface{} { + t := time.NewTimer(time.Hour) + t.Stop() + return t + }, +} + +// Stats contains pool state information and accumulated stats. +type Stats struct { + Hits uint32 // number of times free connection was found in the pool + Misses uint32 // number of times free connection was NOT found in the pool + Timeouts uint32 // number of times a wait timeout occurred + + TotalConns uint32 // number of total connections in the pool + IdleConns uint32 // number of idle connections in the pool + StaleConns uint32 // number of stale connections removed from the pool +} + +type Pooler interface { + NewConn(context.Context) (*Conn, error) + CloseConn(*Conn) error + + Get(context.Context) (*Conn, error) + Put(context.Context, *Conn) + Remove(context.Context, *Conn, error) + + Len() int + IdleLen() int + Stats() *Stats + + Close() error +} + +type Options struct { + Dialer func(context.Context) (net.Conn, error) + OnClose func(*Conn) error + + PoolFIFO bool + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration +} + +type lastDialErrorWrap struct { + err error +} + +type ConnPool struct { + opt *Options + + dialErrorsNum uint32 // atomic + + lastDialError atomic.Value + + queue chan struct{} + + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + poolSize int + idleConnsLen int + + stats Stats + + _closed uint32 // atomic + closedCh chan struct{} +} + +var _ Pooler = (*ConnPool)(nil) + +func NewConnPool(opt *Options) *ConnPool { + p := &ConnPool{ + opt: opt, + + queue: make(chan struct{}, opt.PoolSize), + conns: make([]*Conn, 0, opt.PoolSize), + idleConns: make([]*Conn, 0, opt.PoolSize), + closedCh: make(chan struct{}), + } + + p.connsMu.Lock() + p.checkMinIdleConns() + p.connsMu.Unlock() + + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { + go p.reaper(opt.IdleCheckFrequency) + } + + return p +} + +func (p *ConnPool) checkMinIdleConns() { + if p.opt.MinIdleConns == 0 { + return + } + for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + p.poolSize++ + p.idleConnsLen++ + + go func() { + err := p.addIdleConn() + if err != nil && err != ErrClosed { + p.connsMu.Lock() + p.poolSize-- + p.idleConnsLen-- + p.connsMu.Unlock() + } + }() + } +} + +func (p *ConnPool) addIdleConn() error { + cn, err := p.dialConn(context.TODO(), true) + if err != nil { + return err + } + + p.connsMu.Lock() + defer p.connsMu.Unlock() + + // It is not allowed to add new connections to the closed connection pool. + if p.closed() { + _ = cn.Close() + return ErrClosed + } + + p.conns = append(p.conns, cn) + p.idleConns = append(p.idleConns, cn) + return nil +} + +func (p *ConnPool) NewConn(ctx context.Context) (*Conn, error) { + return p.newConn(ctx, false) +} + +func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { + cn, err := p.dialConn(ctx, pooled) + if err != nil { + return nil, err + } + + p.connsMu.Lock() + defer p.connsMu.Unlock() + + // It is not allowed to add new connections to the closed connection pool. + if p.closed() { + _ = cn.Close() + return nil, ErrClosed + } + + p.conns = append(p.conns, cn) + if pooled { + // If pool is full remove the cn on next Put. + if p.poolSize >= p.opt.PoolSize { + cn.pooled = false + } else { + p.poolSize++ + } + } + + return cn, nil +} + +func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { + if p.closed() { + return nil, ErrClosed + } + + if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { + return nil, p.getLastDialError() + } + + netConn, err := p.opt.Dialer(ctx) + if err != nil { + p.setLastDialError(err) + if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { + go p.tryDial() + } + return nil, err + } + + cn := NewConn(netConn) + cn.pooled = pooled + return cn, nil +} + +func (p *ConnPool) tryDial() { + for { + if p.closed() { + return + } + + conn, err := p.opt.Dialer(context.Background()) + if err != nil { + p.setLastDialError(err) + time.Sleep(time.Second) + continue + } + + atomic.StoreUint32(&p.dialErrorsNum, 0) + _ = conn.Close() + return + } +} + +func (p *ConnPool) setLastDialError(err error) { + p.lastDialError.Store(&lastDialErrorWrap{err: err}) +} + +func (p *ConnPool) getLastDialError() error { + err, _ := p.lastDialError.Load().(*lastDialErrorWrap) + if err != nil { + return err.err + } + return nil +} + +// Get returns existed connection from the pool or creates a new one. +func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { + if p.closed() { + return nil, ErrClosed + } + + if err := p.waitTurn(ctx); err != nil { + return nil, err + } + + for { + p.connsMu.Lock() + cn, err := p.popIdle() + p.connsMu.Unlock() + + if err != nil { + return nil, err + } + + if cn == nil { + break + } + + if p.isStaleConn(cn) { + _ = p.CloseConn(cn) + continue + } + + atomic.AddUint32(&p.stats.Hits, 1) + return cn, nil + } + + atomic.AddUint32(&p.stats.Misses, 1) + + newcn, err := p.newConn(ctx, true) + if err != nil { + p.freeTurn() + return nil, err + } + + return newcn, nil +} + +func (p *ConnPool) getTurn() { + p.queue <- struct{}{} +} + +func (p *ConnPool) waitTurn(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + select { + case p.queue <- struct{}{}: + return nil + default: + } + + timer := timers.Get().(*time.Timer) + timer.Reset(p.opt.PoolTimeout) + + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + timers.Put(timer) + return ctx.Err() + case p.queue <- struct{}{}: + if !timer.Stop() { + <-timer.C + } + timers.Put(timer) + return nil + case <-timer.C: + timers.Put(timer) + atomic.AddUint32(&p.stats.Timeouts, 1) + return ErrPoolTimeout + } +} + +func (p *ConnPool) freeTurn() { + <-p.queue +} + +func (p *ConnPool) popIdle() (*Conn, error) { + if p.closed() { + return nil, ErrClosed + } + n := len(p.idleConns) + if n == 0 { + return nil, nil + } + + var cn *Conn + if p.opt.PoolFIFO { + cn = p.idleConns[0] + copy(p.idleConns, p.idleConns[1:]) + p.idleConns = p.idleConns[:n-1] + } else { + idx := n - 1 + cn = p.idleConns[idx] + p.idleConns = p.idleConns[:idx] + } + p.idleConnsLen-- + p.checkMinIdleConns() + return cn, nil +} + +func (p *ConnPool) Put(ctx context.Context, cn *Conn) { + if cn.rd.Buffered() > 0 { + internal.Logger.Printf(ctx, "Conn has unread data") + p.Remove(ctx, cn, BadConnError{}) + return + } + + if !cn.pooled { + p.Remove(ctx, cn, nil) + return + } + + p.connsMu.Lock() + p.idleConns = append(p.idleConns, cn) + p.idleConnsLen++ + p.connsMu.Unlock() + p.freeTurn() +} + +func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { + p.removeConnWithLock(cn) + p.freeTurn() + _ = p.closeConn(cn) +} + +func (p *ConnPool) CloseConn(cn *Conn) error { + p.removeConnWithLock(cn) + return p.closeConn(cn) +} + +func (p *ConnPool) removeConnWithLock(cn *Conn) { + p.connsMu.Lock() + p.removeConn(cn) + p.connsMu.Unlock() +} + +func (p *ConnPool) removeConn(cn *Conn) { + for i, c := range p.conns { + if c == cn { + p.conns = append(p.conns[:i], p.conns[i+1:]...) + if cn.pooled { + p.poolSize-- + p.checkMinIdleConns() + } + return + } + } +} + +func (p *ConnPool) closeConn(cn *Conn) error { + if p.opt.OnClose != nil { + _ = p.opt.OnClose(cn) + } + return cn.Close() +} + +// Len returns total number of connections. +func (p *ConnPool) Len() int { + p.connsMu.Lock() + n := len(p.conns) + p.connsMu.Unlock() + return n +} + +// IdleLen returns number of idle connections. +func (p *ConnPool) IdleLen() int { + p.connsMu.Lock() + n := p.idleConnsLen + p.connsMu.Unlock() + return n +} + +func (p *ConnPool) Stats() *Stats { + idleLen := p.IdleLen() + return &Stats{ + Hits: atomic.LoadUint32(&p.stats.Hits), + Misses: atomic.LoadUint32(&p.stats.Misses), + Timeouts: atomic.LoadUint32(&p.stats.Timeouts), + + TotalConns: uint32(p.Len()), + IdleConns: uint32(idleLen), + StaleConns: atomic.LoadUint32(&p.stats.StaleConns), + } +} + +func (p *ConnPool) closed() bool { + return atomic.LoadUint32(&p._closed) == 1 +} + +func (p *ConnPool) Filter(fn func(*Conn) bool) error { + p.connsMu.Lock() + defer p.connsMu.Unlock() + + var firstErr error + for _, cn := range p.conns { + if fn(cn) { + if err := p.closeConn(cn); err != nil && firstErr == nil { + firstErr = err + } + } + } + return firstErr +} + +func (p *ConnPool) Close() error { + if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) { + return ErrClosed + } + close(p.closedCh) + + var firstErr error + p.connsMu.Lock() + for _, cn := range p.conns { + if err := p.closeConn(cn); err != nil && firstErr == nil { + firstErr = err + } + } + p.conns = nil + p.poolSize = 0 + p.idleConns = nil + p.idleConnsLen = 0 + p.connsMu.Unlock() + + return firstErr +} + +func (p *ConnPool) reaper(frequency time.Duration) { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // It is possible that ticker and closedCh arrive together, + // and select pseudo-randomly pick ticker case, we double + // check here to prevent being executed after closed. + if p.closed() { + return + } + _, err := p.ReapStaleConns() + if err != nil { + internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err) + continue + } + case <-p.closedCh: + return + } + } +} + +func (p *ConnPool) ReapStaleConns() (int, error) { + var n int + for { + p.getTurn() + + p.connsMu.Lock() + cn := p.reapStaleConn() + p.connsMu.Unlock() + + p.freeTurn() + + if cn != nil { + _ = p.closeConn(cn) + n++ + } else { + break + } + } + atomic.AddUint32(&p.stats.StaleConns, uint32(n)) + return n, nil +} + +func (p *ConnPool) reapStaleConn() *Conn { + if len(p.idleConns) == 0 { + return nil + } + + cn := p.idleConns[0] + if !p.isStaleConn(cn) { + return nil + } + + p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) + p.idleConnsLen-- + p.removeConn(cn) + + return cn +} + +func (p *ConnPool) isStaleConn(cn *Conn) bool { + if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { + return false + } + + now := time.Now() + if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { + return true + } + if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge { + return true + } + + return false +} diff --git a/vendor/github.com/go-redis/redis/v8/internal/pool/pool_single.go b/vendor/github.com/go-redis/redis/v8/internal/pool/pool_single.go new file mode 100644 index 0000000000..5a3fde191b --- /dev/null +++ b/vendor/github.com/go-redis/redis/v8/internal/pool/pool_single.go @@ -0,0 +1,58 @@ +package pool + +import "context" + +type SingleConnPool struct { + pool Pooler + cn *Conn + stickyErr error +} + +var _ Pooler = (*SingleConnPool)(nil) + +func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool { + return &SingleConnPool{ + pool: pool, + cn: cn, + } +} + +func (p *SingleConnPool) NewConn(ctx context.Context) (*Conn, error) { + return p.pool.NewConn(ctx) +} + +func (p *SingleConnPool) CloseConn(cn *Conn) error { + return p.pool.CloseConn(cn) +} + +func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) { + if p.stickyErr != nil { + return nil, p.stickyErr + } + return p.cn, nil +} + +func (p *SingleConnPool) Put(ctx context.Context, cn *Conn) {} + +func (p *SingleConnPool) Remove(ctx context.Context, cn *Conn, reason error) { + p.cn = nil + p.stickyErr = reason +} + +func (p *SingleConnPool) Close() error { + p.cn = nil + p.stickyErr = ErrClosed + return nil +} + +func (p *SingleConnPool) Len() int { + return 0 +} + +func (p *SingleConnPool) IdleLen() int { + return 0 +} + +func (p *SingleConnPool) Stats() *Stats { + return &Stats{} +} diff --git a/vendor/github.com/go-redis/redis/v8/internal/pool/pool_sticky.go b/vendor/github.com/go-redis/redis/v8/internal/pool/pool_sticky.go new file mode 100644 index 0000000000..3adb99bc82 --- /dev/null +++ b/vendor/github.com/go-redis/redis/v8/internal/pool/pool_sticky.go @@ -0,0 +1,201 @@ +package pool + +import ( + "context" + "errors" + "fmt" + "sync/atomic" +) + +const ( + stateDefault = 0 + stateInited = 1 + stateClosed = 2 +) + +type BadConnError struct { + wrapped error +} + +var _ error = (*BadConnError)(nil) + +func (e BadConnError) Error() string { + s := "redis: Conn is in a bad state" + if e.wrapped != nil { + s += ": " + e.wrapped.Error() + } + return s +} + +func (e BadConnError) Unwrap() error { + return e.wrapped +} + +//------------------------------------------------------------------------------ + +type StickyConnPool struct { + pool Pooler + shared int32 // atomic + + state uint32 // atomic + ch chan *Conn + + _badConnError atomic.Value +} + +var _ Pooler = (*StickyConnPool)(nil) + +func NewStickyConnPool(pool Pooler) *StickyConnPool { + p, ok := pool.(*StickyConnPool) + if !ok { + p = &StickyConnPool{ + pool: pool, + ch: make(chan *Conn, 1), + } + } + atomic.AddInt32(&p.shared, 1) + return p +} + +func (p *StickyConnPool) NewConn(ctx context.Context) (*Conn, error) { + return p.pool.NewConn(ctx) +} + +func (p *StickyConnPool) CloseConn(cn *Conn) error { + return p.pool.CloseConn(cn) +} + +func (p *StickyConnPool) Get(ctx context.Context) (*Conn, error) { + // In worst case this races with Close which is not a very common operation. + for i := 0; i < 1000; i++ { + switch atomic.LoadUint32(&p.state) { + case stateDefault: + cn, err := p.pool.Get(ctx) + if err != nil { + return nil, err + } + if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { + return cn, nil + } + p.pool.Remove(ctx, cn, ErrClosed) + case stateInited: + if err := p.badConnError(); err != nil { + return nil, err + } + cn, ok := <-p.ch + if !ok { + return nil, ErrClosed + } + return cn, nil + case stateClosed: + return nil, ErrClosed + default: + panic("not reached") + } + } + return nil, fmt.Errorf("redis: StickyConnPool.Get: infinite loop") +} + +func (p *StickyConnPool) Put(ctx context.Context, cn *Conn) { + defer func() { + if recover() != nil { + p.freeConn(ctx, cn) + } + }() + p.ch <- cn +} + +func (p *StickyConnPool) freeConn(ctx context.Context, cn *Conn) { + if err := p.badConnError(); err != nil { + p.pool.Remove(ctx, cn, err) + } else { + p.pool.Put(ctx, cn) + } +} + +func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error) { + defer func() { + if recover() != nil { + p.pool.Remove(ctx, cn, ErrClosed) + } + }() + p._badConnError.Store(BadConnError{wrapped: reason}) + p.ch <- cn +} + +func (p *StickyConnPool) Close() error { + if shared := atomic.AddInt32(&p.shared, -1); shared > 0 { + return nil + } + + for i := 0; i < 1000; i++ { + state := atomic.LoadUint32(&p.state) + if state == stateClosed { + return ErrClosed + } + if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) { + close(p.ch) + cn, ok := <-p.ch + if ok { + p.freeConn(context.TODO(), cn) + } + return nil + } + } + + return errors.New("redis: StickyConnPool.Close: infinite loop") +} + +func (p *StickyConnPool) Reset(ctx context.Context) error { + if p.badConnError() == nil { + return nil + } + + select { + case cn, ok := <-p.ch: + if !ok { + return ErrClosed + } + p.pool.Remove(ctx, cn, ErrClosed) + p._badConnError.Store(BadConnError{wrapped: nil}) + default: + return errors.New("redis: StickyConnPool does not have a Conn") + } + + if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) { + state := atomic.LoadUint32(&p.state) + return fmt.Errorf("redis: invalid StickyConnPool state: %d", state) + } + + return nil +} + +func (p *StickyConnPool) badConnError() error { + if v := p._badConnError.Load(); v != nil { + if err := v.(BadConnError); err.wrapped != nil { + return err + } + } + return nil +} + +func (p *StickyConnPool) Len() int { + switch atomic.LoadUint32(&p.state) { + case stateDefault: + return 0 + case stateInited: + return 1 + case stateClosed: + return 0 + default: + panic("not reached") + } +} + +func (p *StickyConnPool) IdleLen() int { + return len(p.ch) +} + +func (p *StickyConnPool) Stats() *Stats { + return &Stats{} +} diff --git a/vendor/github.com/go-redis/redis/v8/internal/pool/ya.make b/vendor/github.com/go-redis/redis/v8/internal/pool/ya.make new file mode 100644 index 0000000000..31e095aba1 --- /dev/null +++ b/vendor/github.com/go-redis/redis/v8/internal/pool/ya.make @@ -0,0 +1,22 @@ +GO_LIBRARY() + +LICENSE(BSD-2-Clause) + +SRCS( + conn.go + pool.go + pool_single.go + pool_sticky.go +) + +GO_TEST_SRCS(export_test.go) + +GO_XTEST_SRCS( + bench_test.go + main_test.go + pool_test.go +) + +END() + +RECURSE(gotest) |