From 77e7a08c4dda720bad098e61fe8d6aa83bcd96ad Mon Sep 17 00:00:00 2001 From: Varakh Date: Thu, 23 May 2024 18:41:48 +0200 Subject: [PATCH] feature(locking): add proper locking and overhaul existing locking service --- README.md | 28 +++++++++ go.mod | 2 +- server/app.go | 13 ++++- server/service_lock.go | 66 +++++++++++++++++++-- server/service_lock_mem.go | 55 ++++++++++-------- server/service_lock_redis.go | 107 +++++++++++++++++++++++++++++++++++ server/service_task.go | 3 +- 7 files changed, 242 insertions(+), 32 deletions(-) create mode 100644 server/service_lock_redis.go diff --git a/README.md b/README.md index 09d660b..8087fde 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,34 @@ LOGGING_LEVEL=debug Please look into the `_doc/` folder for [OpenAPI specification](./_doc/api.yaml) and a Postman Collection. +### Lock service + +The `lockService` can be used to lock resources. This works in-memory and also in a distributed fashion with REDIS. + +Ensure to provide proper locking options when using, although in-memory ignores those. + +Example: + +```shell +# invoked from an endpoint +context := c.Request.Context() + +var err error +var lock appLock + +if lock, err = h.lockService.lockWithOptions(context, "TEST-LOCK", withAppLockOptionExpiry(5*time.Minute), withAppLockOptionInfiniteRetries(), withAppLockOptionRetryDelay(5*time.Second)); err != nil { + _ = c.AbortWithError(errToHttpStatus(err), err) + return +} +# defer to avoid leakage +defer func(lock appLock) { + _ = lock.unlock(context) +}(lock) + +# simulate long running task +time.Sleep(20 * time.Second) +``` + ### Getting started 1. Run `make clean dependencies` to fetch dependencies diff --git a/go.mod b/go.mod index 1fffc68..e3f70fb 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/go-co-op/gocron-redis-lock/v2 v2.0.1 github.com/go-co-op/gocron/v2 v2.5.0 github.com/go-playground/validator/v10 v10.20.0 + github.com/go-redsync/redsync/v4 v4.11.0 github.com/go-resty/resty/v2 v2.13.1 github.com/google/uuid v1.6.0 github.com/redis/go-redis/v9 v9.5.1 @@ -39,7 +40,6 @@ require ( github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-redsync/redsync/v4 v4.11.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/server/app.go b/server/app.go index 40c87c2..52efa60 100644 --- a/server/app.go +++ b/server/app.go @@ -49,7 +49,18 @@ func Start() { actionRepo := newActionDbRepo(env.db) actionInvocationRepo := newActionInvocationDbRepo(env.db) - lockService := newLockMemService() + var lockService lockService + + if env.lockConfig.redisEnabled { + var err error + lockService, err = newLockRedisService(env.lockConfig) + + if err != nil { + zap.L().Fatal("Failed to create lock service", zap.Error(err)) + } + } else { + lockService = newLockMemService() + } eventService := newEventService(eventRepo) updateService := newUpdateService(updateRepo, eventService) diff --git a/server/service_lock.go b/server/service_lock.go index 9b7baf1..5bd4e37 100644 --- a/server/service_lock.go +++ b/server/service_lock.go @@ -1,9 +1,65 @@ package server +import ( + "context" + "math" + "time" +) + +// lockService provides methods for locking resources, behavior depends on underlying implementation type lockService interface { - init() error - tryLock(resource string) error - release(resource string) error - exists(resource string) bool - stop() + // lock locks a resource applying default TTL values (no options for in-memory locking, otherwise applies default implementation of other lockers) + lock(ctx context.Context, resource string) (appLock, error) + + // lockWithOptions locks a resource with given options, has no effect for in-memory locking + lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error) +} + +type appLock interface { + // unlock unlocks a lock + unlock(ctx context.Context) error +} + +type appLockOption interface { + Apply(l *appLockOptions) +} + +type appLockOptionFunc func(o *appLockOptions) + +func (f appLockOptionFunc) Apply(o *appLockOptions) { + f(o) +} + +type appLockOptions struct { + ttl *time.Duration + retryDelay *time.Duration + maxRetries *int +} + +func withAppLockOptionExpiry(expiry time.Duration) appLockOption { + return appLockOptionFunc(func(o *appLockOptions) { + o.ttl = &expiry + }) +} + +func withAppLockOptionRetries(retries int) appLockOption { + return appLockOptionFunc(func(o *appLockOptions) { + o.maxRetries = &retries + }) +} + +var ( + appLockOptionMaxRetries = math.MaxInt32 +) + +func withAppLockOptionInfiniteRetries() appLockOption { + return appLockOptionFunc(func(o *appLockOptions) { + o.maxRetries = &appLockOptionMaxRetries + }) +} + +func withAppLockOptionRetryDelay(retryDelay time.Duration) appLockOption { + return appLockOptionFunc(func(o *appLockOptions) { + o.retryDelay = &retryDelay + }) } diff --git a/server/service_lock_mem.go b/server/service_lock_mem.go index b627fa6..01ef4eb 100644 --- a/server/service_lock_mem.go +++ b/server/service_lock_mem.go @@ -1,6 +1,8 @@ package server import ( + "context" + "errors" "git.myservermanager.com/varakh/upda/util" "go.uber.org/zap" ) @@ -9,44 +11,51 @@ type lockMemService struct { registry *util.InMemoryLockRegistry } +var ( + errLockMemNotReleased = newServiceError(Conflict, errors.New("lock service: could not release lock")) +) + func newLockMemService() lockService { + zap.L().Info("Initialized in-memory locking service") return &lockMemService{registry: util.NewInMemoryLockRegistry()} } -func (s *lockMemService) init() error { - zap.L().Info("Initialized in-memory locking service") - return nil -} - -func (s *lockMemService) tryLock(resource string) error { +func (s *lockMemService) lock(ctx context.Context, resource string) (appLock, error) { if resource == "" { - return errorValidationNotBlank + return nil, errorValidationNotBlank } zap.L().Sugar().Debugf("Trying to lock '%s'", resource) + s.registry.Lock(resource) + zap.L().Sugar().Debugf("Locked '%s'", resource) - return nil -} - -func (s *lockMemService) release(resource string) error { - if resource == "" { - return errorValidationNotBlank + l := &inMemoryLock{ + registry: s.registry, + resource: resource, } - zap.L().Sugar().Debugf("Releasing lock '%s'", resource) - err := s.registry.Unlock(resource) - zap.L().Sugar().Debugf("Released lock '%s'", resource) - - return err + return l, nil } -func (s *lockMemService) exists(resource string) bool { - return s.registry.Exists(resource) +func (s *lockMemService) lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error) { + // ignores any options + return s.lock(ctx, resource) } -func (s *lockMemService) stop() { - zap.L().Info("Clearing in-memory locking service") - s.registry.Clear() +var _ appLock = (*inMemoryLock)(nil) + +type inMemoryLock struct { + registry *util.InMemoryLockRegistry + resource string +} + +func (r inMemoryLock) unlock(ctx context.Context) error { + zap.L().Sugar().Debugf("Unlocking '%s'", r.resource) + + if err := r.registry.Unlock(r.resource); err != nil { + return errLockMemNotReleased + } + return nil } diff --git a/server/service_lock_redis.go b/server/service_lock_redis.go new file mode 100644 index 0000000..11b3c57 --- /dev/null +++ b/server/service_lock_redis.go @@ -0,0 +1,107 @@ +package server + +import ( + "context" + "errors" + "fmt" + "github.com/go-redsync/redsync/v4" + redsyncgoredis "github.com/go-redsync/redsync/v4/redis/goredis/v9" + "github.com/redis/go-redis/v9" + "go.uber.org/zap" +) + +type lockRedisService struct { + rs *redsync.Redsync +} + +var ( + errLockRedisNotObtained = newServiceError(Conflict, errors.New("lock service: could not obtain lock")) + errLockRedisNotReleased = newServiceError(Conflict, errors.New("lock service: could not release lock")) +) + +func newLockRedisService(lc *lockConfig) (lockService, error) { + zap.L().Info("Initialized REDIS locking service") + + var err error + var redisOptions *redis.Options + redisOptions, err = redis.ParseURL(lc.redisUrl) + + if err != nil { + zap.L().Sugar().Fatalf("Cannot parse REDIS URL '%s' to set up locking. Reason: %s", lc.redisUrl, err.Error()) + } + + c := redis.NewClient(redisOptions) + if err = c.Ping(context.Background()).Err(); err != nil { + return nil, newServiceError(General, fmt.Errorf("lock service: failed to connect to REDIS. Reason: %w", err)) + } + + pool := redsyncgoredis.NewPool(c) + rs := redsync.New(pool) + + return &lockRedisService{rs: rs}, nil +} + +func (s *lockRedisService) lock(ctx context.Context, resource string) (appLock, error) { + return s.lockWithOptions(ctx, resource, nil) +} + +func (s *lockRedisService) lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error) { + if resource == "" { + return nil, errorValidationNotBlank + } + + var rsOptions []redsync.Option + + if options != nil { + lockOptions := &appLockOptions{} + for _, o := range options { + o.Apply(lockOptions) + } + + if lockOptions.ttl != nil { + rsOptions = append(rsOptions, redsync.WithExpiry(*lockOptions.ttl)) + } + if lockOptions.maxRetries != nil { + rsOptions = append(rsOptions, redsync.WithTries(*lockOptions.maxRetries)) + } + if lockOptions.retryDelay != nil { + rsOptions = append(rsOptions, redsync.WithRetryDelay(*lockOptions.retryDelay)) + } + } + + mu := s.rs.NewMutex(resource, rsOptions...) + + zap.L().Sugar().Debugf("Trying to lock '%s'", resource) + + if err := mu.LockContext(ctx); err != nil { + return nil, errLockRedisNotObtained + } + + zap.L().Sugar().Debugf("Locked '%s'", resource) + + l := &redisLock{ + mu: mu, + } + + return l, nil +} + +var _ appLock = (*redisLock)(nil) + +type redisLock struct { + mu *redsync.Mutex +} + +func (r redisLock) unlock(ctx context.Context) error { + zap.L().Sugar().Debugf("Unlocking '%s'", r.mu.Name()) + + unlocked, err := r.mu.UnlockContext(ctx) + if err != nil { + return errLockRedisNotReleased + } + if !unlocked { + return errLockRedisNotReleased + } + + return nil +} diff --git a/server/service_task.go b/server/service_task.go index f8d03cd..1359d12 100644 --- a/server/service_task.go +++ b/server/service_task.go @@ -69,7 +69,7 @@ func newTaskService(u *updateService, e *eventService, w *webhookService, a *act redisClient := redis.NewClient(redisOptions) var locker gocron.Locker - if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1)); err != nil { + if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1), redislock.WithExpiry(30*time.Second), redislock.WithRetryDelay(5*time.Second)); err != nil { zap.L().Sugar().Fatalf("Cannot set up REDIS locker. Reason: %s", err.Error()) } @@ -111,7 +111,6 @@ func (s *taskService) stop() { if err := s.scheduler.Shutdown(); err != nil { zap.L().Sugar().Warnf("Cannot shut down scheduler. Reason: %v", err) } - s.lockService.stop() zap.L().Info("Stopped all periodic tasks") }