108 lines
2.6 KiB
Go
108 lines
2.6 KiB
Go
|
package server
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"github.com/go-redsync/redsync/v4"
|
||
|
redsyncgoredis "github.com/go-redsync/redsync/v4/redis/goredis/v9"
|
||
|
"github.com/redis/go-redis/v9"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
type lockRedisService struct {
|
||
|
rs *redsync.Redsync
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
errLockRedisNotObtained = newServiceError(Conflict, errors.New("lock service: could not obtain lock"))
|
||
|
errLockRedisNotReleased = newServiceError(Conflict, errors.New("lock service: could not release lock"))
|
||
|
)
|
||
|
|
||
|
func newLockRedisService(lc *lockConfig) (lockService, error) {
|
||
|
zap.L().Info("Initialized REDIS locking service")
|
||
|
|
||
|
var err error
|
||
|
var redisOptions *redis.Options
|
||
|
redisOptions, err = redis.ParseURL(lc.redisUrl)
|
||
|
|
||
|
if err != nil {
|
||
|
zap.L().Sugar().Fatalf("Cannot parse REDIS URL '%s' to set up locking. Reason: %s", lc.redisUrl, err.Error())
|
||
|
}
|
||
|
|
||
|
c := redis.NewClient(redisOptions)
|
||
|
if err = c.Ping(context.Background()).Err(); err != nil {
|
||
|
return nil, newServiceError(General, fmt.Errorf("lock service: failed to connect to REDIS. Reason: %w", err))
|
||
|
}
|
||
|
|
||
|
pool := redsyncgoredis.NewPool(c)
|
||
|
rs := redsync.New(pool)
|
||
|
|
||
|
return &lockRedisService{rs: rs}, nil
|
||
|
}
|
||
|
|
||
|
func (s *lockRedisService) lock(ctx context.Context, resource string) (appLock, error) {
|
||
|
return s.lockWithOptions(ctx, resource, nil)
|
||
|
}
|
||
|
|
||
|
func (s *lockRedisService) lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error) {
|
||
|
if resource == "" {
|
||
|
return nil, errorValidationNotBlank
|
||
|
}
|
||
|
|
||
|
var rsOptions []redsync.Option
|
||
|
|
||
|
if options != nil {
|
||
|
lockOptions := &appLockOptions{}
|
||
|
for _, o := range options {
|
||
|
o.Apply(lockOptions)
|
||
|
}
|
||
|
|
||
|
if lockOptions.ttl != nil {
|
||
|
rsOptions = append(rsOptions, redsync.WithExpiry(*lockOptions.ttl))
|
||
|
}
|
||
|
if lockOptions.maxRetries != nil {
|
||
|
rsOptions = append(rsOptions, redsync.WithTries(*lockOptions.maxRetries))
|
||
|
}
|
||
|
if lockOptions.retryDelay != nil {
|
||
|
rsOptions = append(rsOptions, redsync.WithRetryDelay(*lockOptions.retryDelay))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
mu := s.rs.NewMutex(resource, rsOptions...)
|
||
|
|
||
|
zap.L().Sugar().Debugf("Trying to lock '%s'", resource)
|
||
|
|
||
|
if err := mu.LockContext(ctx); err != nil {
|
||
|
return nil, errLockRedisNotObtained
|
||
|
}
|
||
|
|
||
|
zap.L().Sugar().Debugf("Locked '%s'", resource)
|
||
|
|
||
|
l := &redisLock{
|
||
|
mu: mu,
|
||
|
}
|
||
|
|
||
|
return l, nil
|
||
|
}
|
||
|
|
||
|
var _ appLock = (*redisLock)(nil)
|
||
|
|
||
|
type redisLock struct {
|
||
|
mu *redsync.Mutex
|
||
|
}
|
||
|
|
||
|
func (r redisLock) unlock(ctx context.Context) error {
|
||
|
zap.L().Sugar().Debugf("Unlocking '%s'", r.mu.Name())
|
||
|
|
||
|
unlocked, err := r.mu.UnlockContext(ctx)
|
||
|
if err != nil {
|
||
|
return errLockRedisNotReleased
|
||
|
}
|
||
|
if !unlocked {
|
||
|
return errLockRedisNotReleased
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|