feature(locking): add proper locking and overhaul existing locking service #34
7 changed files with 242 additions and 32 deletions
28
README.md
28
README.md
|
@ -37,6 +37,34 @@ LOGGING_LEVEL=debug
|
|||
|
||||
Please look into the `_doc/` folder for [OpenAPI specification](./_doc/api.yaml) and a Postman Collection.
|
||||
|
||||
### Lock service
|
||||
|
||||
The `lockService` can be used to lock resources. This works in-memory and also in a distributed fashion with REDIS.
|
||||
|
||||
Ensure to provide proper locking options when using, although in-memory ignores those.
|
||||
|
||||
Example:
|
||||
|
||||
```shell
|
||||
# invoked from an endpoint
|
||||
context := c.Request.Context()
|
||||
|
||||
var err error
|
||||
var lock appLock
|
||||
|
||||
if lock, err = h.lockService.lockWithOptions(context, "TEST-LOCK", withAppLockOptionExpiry(5*time.Minute), withAppLockOptionInfiniteRetries(), withAppLockOptionRetryDelay(5*time.Second)); err != nil {
|
||||
_ = c.AbortWithError(errToHttpStatus(err), err)
|
||||
return
|
||||
}
|
||||
# defer to avoid leakage
|
||||
defer func(lock appLock) {
|
||||
_ = lock.unlock(context)
|
||||
}(lock)
|
||||
|
||||
# simulate long running task
|
||||
time.Sleep(20 * time.Second)
|
||||
```
|
||||
|
||||
### Getting started
|
||||
|
||||
1. Run `make clean dependencies` to fetch dependencies
|
||||
|
|
2
go.mod
2
go.mod
|
@ -12,6 +12,7 @@ require (
|
|||
github.com/go-co-op/gocron-redis-lock/v2 v2.0.1
|
||||
github.com/go-co-op/gocron/v2 v2.5.0
|
||||
github.com/go-playground/validator/v10 v10.20.0
|
||||
github.com/go-redsync/redsync/v4 v4.11.0
|
||||
github.com/go-resty/resty/v2 v2.13.1
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/redis/go-redis/v9 v9.5.1
|
||||
|
@ -39,7 +40,6 @@ require (
|
|||
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-redsync/redsync/v4 v4.11.0 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
|
|
|
@ -49,7 +49,18 @@ func Start() {
|
|||
actionRepo := newActionDbRepo(env.db)
|
||||
actionInvocationRepo := newActionInvocationDbRepo(env.db)
|
||||
|
||||
lockService := newLockMemService()
|
||||
var lockService lockService
|
||||
|
||||
if env.lockConfig.redisEnabled {
|
||||
var err error
|
||||
lockService, err = newLockRedisService(env.lockConfig)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Fatal("Failed to create lock service", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
lockService = newLockMemService()
|
||||
}
|
||||
|
||||
eventService := newEventService(eventRepo)
|
||||
updateService := newUpdateService(updateRepo, eventService)
|
||||
|
|
|
@ -1,9 +1,65 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
// lockService provides methods for locking resources, behavior depends on underlying implementation
|
||||
type lockService interface {
|
||||
init() error
|
||||
tryLock(resource string) error
|
||||
release(resource string) error
|
||||
exists(resource string) bool
|
||||
stop()
|
||||
// lock locks a resource applying default TTL values (no options for in-memory locking, otherwise applies default implementation of other lockers)
|
||||
lock(ctx context.Context, resource string) (appLock, error)
|
||||
|
||||
// lockWithOptions locks a resource with given options, has no effect for in-memory locking
|
||||
lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error)
|
||||
}
|
||||
|
||||
type appLock interface {
|
||||
// unlock unlocks a lock
|
||||
unlock(ctx context.Context) error
|
||||
}
|
||||
|
||||
type appLockOption interface {
|
||||
Apply(l *appLockOptions)
|
||||
}
|
||||
|
||||
type appLockOptionFunc func(o *appLockOptions)
|
||||
|
||||
func (f appLockOptionFunc) Apply(o *appLockOptions) {
|
||||
f(o)
|
||||
}
|
||||
|
||||
type appLockOptions struct {
|
||||
ttl *time.Duration
|
||||
retryDelay *time.Duration
|
||||
maxRetries *int
|
||||
}
|
||||
|
||||
func withAppLockOptionExpiry(expiry time.Duration) appLockOption {
|
||||
return appLockOptionFunc(func(o *appLockOptions) {
|
||||
o.ttl = &expiry
|
||||
})
|
||||
}
|
||||
|
||||
func withAppLockOptionRetries(retries int) appLockOption {
|
||||
return appLockOptionFunc(func(o *appLockOptions) {
|
||||
o.maxRetries = &retries
|
||||
})
|
||||
}
|
||||
|
||||
var (
|
||||
appLockOptionMaxRetries = math.MaxInt32
|
||||
)
|
||||
|
||||
func withAppLockOptionInfiniteRetries() appLockOption {
|
||||
return appLockOptionFunc(func(o *appLockOptions) {
|
||||
o.maxRetries = &appLockOptionMaxRetries
|
||||
})
|
||||
}
|
||||
|
||||
func withAppLockOptionRetryDelay(retryDelay time.Duration) appLockOption {
|
||||
return appLockOptionFunc(func(o *appLockOptions) {
|
||||
o.retryDelay = &retryDelay
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"git.myservermanager.com/varakh/upda/util"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -9,44 +11,51 @@ type lockMemService struct {
|
|||
registry *util.InMemoryLockRegistry
|
||||
}
|
||||
|
||||
var (
|
||||
errLockMemNotReleased = newServiceError(Conflict, errors.New("lock service: could not release lock"))
|
||||
)
|
||||
|
||||
func newLockMemService() lockService {
|
||||
zap.L().Info("Initialized in-memory locking service")
|
||||
return &lockMemService{registry: util.NewInMemoryLockRegistry()}
|
||||
}
|
||||
|
||||
func (s *lockMemService) init() error {
|
||||
zap.L().Info("Initialized in-memory locking service")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *lockMemService) tryLock(resource string) error {
|
||||
func (s *lockMemService) lock(ctx context.Context, resource string) (appLock, error) {
|
||||
if resource == "" {
|
||||
return errorValidationNotBlank
|
||||
return nil, errorValidationNotBlank
|
||||
}
|
||||
|
||||
zap.L().Sugar().Debugf("Trying to lock '%s'", resource)
|
||||
|
||||
s.registry.Lock(resource)
|
||||
|
||||
zap.L().Sugar().Debugf("Locked '%s'", resource)
|
||||
|
||||
l := &inMemoryLock{
|
||||
registry: s.registry,
|
||||
resource: resource,
|
||||
}
|
||||
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func (s *lockMemService) lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error) {
|
||||
// ignores any options
|
||||
return s.lock(ctx, resource)
|
||||
}
|
||||
|
||||
var _ appLock = (*inMemoryLock)(nil)
|
||||
|
||||
type inMemoryLock struct {
|
||||
registry *util.InMemoryLockRegistry
|
||||
resource string
|
||||
}
|
||||
|
||||
func (r inMemoryLock) unlock(ctx context.Context) error {
|
||||
zap.L().Sugar().Debugf("Unlocking '%s'", r.resource)
|
||||
|
||||
if err := r.registry.Unlock(r.resource); err != nil {
|
||||
return errLockMemNotReleased
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *lockMemService) release(resource string) error {
|
||||
if resource == "" {
|
||||
return errorValidationNotBlank
|
||||
}
|
||||
|
||||
zap.L().Sugar().Debugf("Releasing lock '%s'", resource)
|
||||
err := s.registry.Unlock(resource)
|
||||
zap.L().Sugar().Debugf("Released lock '%s'", resource)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *lockMemService) exists(resource string) bool {
|
||||
return s.registry.Exists(resource)
|
||||
}
|
||||
|
||||
func (s *lockMemService) stop() {
|
||||
zap.L().Info("Clearing in-memory locking service")
|
||||
s.registry.Clear()
|
||||
}
|
||||
|
|
107
server/service_lock_redis.go
Normal file
107
server/service_lock_redis.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/go-redsync/redsync/v4"
|
||||
redsyncgoredis "github.com/go-redsync/redsync/v4/redis/goredis/v9"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type lockRedisService struct {
|
||||
rs *redsync.Redsync
|
||||
}
|
||||
|
||||
var (
|
||||
errLockRedisNotObtained = newServiceError(Conflict, errors.New("lock service: could not obtain lock"))
|
||||
errLockRedisNotReleased = newServiceError(Conflict, errors.New("lock service: could not release lock"))
|
||||
)
|
||||
|
||||
func newLockRedisService(lc *lockConfig) (lockService, error) {
|
||||
zap.L().Info("Initialized REDIS locking service")
|
||||
|
||||
var err error
|
||||
var redisOptions *redis.Options
|
||||
redisOptions, err = redis.ParseURL(lc.redisUrl)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Sugar().Fatalf("Cannot parse REDIS URL '%s' to set up locking. Reason: %s", lc.redisUrl, err.Error())
|
||||
}
|
||||
|
||||
c := redis.NewClient(redisOptions)
|
||||
if err = c.Ping(context.Background()).Err(); err != nil {
|
||||
return nil, newServiceError(General, fmt.Errorf("lock service: failed to connect to REDIS. Reason: %w", err))
|
||||
}
|
||||
|
||||
pool := redsyncgoredis.NewPool(c)
|
||||
rs := redsync.New(pool)
|
||||
|
||||
return &lockRedisService{rs: rs}, nil
|
||||
}
|
||||
|
||||
func (s *lockRedisService) lock(ctx context.Context, resource string) (appLock, error) {
|
||||
return s.lockWithOptions(ctx, resource, nil)
|
||||
}
|
||||
|
||||
func (s *lockRedisService) lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error) {
|
||||
if resource == "" {
|
||||
return nil, errorValidationNotBlank
|
||||
}
|
||||
|
||||
var rsOptions []redsync.Option
|
||||
|
||||
if options != nil {
|
||||
lockOptions := &appLockOptions{}
|
||||
for _, o := range options {
|
||||
o.Apply(lockOptions)
|
||||
}
|
||||
|
||||
if lockOptions.ttl != nil {
|
||||
rsOptions = append(rsOptions, redsync.WithExpiry(*lockOptions.ttl))
|
||||
}
|
||||
if lockOptions.maxRetries != nil {
|
||||
rsOptions = append(rsOptions, redsync.WithTries(*lockOptions.maxRetries))
|
||||
}
|
||||
if lockOptions.retryDelay != nil {
|
||||
rsOptions = append(rsOptions, redsync.WithRetryDelay(*lockOptions.retryDelay))
|
||||
}
|
||||
}
|
||||
|
||||
mu := s.rs.NewMutex(resource, rsOptions...)
|
||||
|
||||
zap.L().Sugar().Debugf("Trying to lock '%s'", resource)
|
||||
|
||||
if err := mu.LockContext(ctx); err != nil {
|
||||
return nil, errLockRedisNotObtained
|
||||
}
|
||||
|
||||
zap.L().Sugar().Debugf("Locked '%s'", resource)
|
||||
|
||||
l := &redisLock{
|
||||
mu: mu,
|
||||
}
|
||||
|
||||
return l, nil
|
||||
}
|
||||
|
||||
var _ appLock = (*redisLock)(nil)
|
||||
|
||||
type redisLock struct {
|
||||
mu *redsync.Mutex
|
||||
}
|
||||
|
||||
func (r redisLock) unlock(ctx context.Context) error {
|
||||
zap.L().Sugar().Debugf("Unlocking '%s'", r.mu.Name())
|
||||
|
||||
unlocked, err := r.mu.UnlockContext(ctx)
|
||||
if err != nil {
|
||||
return errLockRedisNotReleased
|
||||
}
|
||||
if !unlocked {
|
||||
return errLockRedisNotReleased
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -69,7 +69,7 @@ func newTaskService(u *updateService, e *eventService, w *webhookService, a *act
|
|||
redisClient := redis.NewClient(redisOptions)
|
||||
|
||||
var locker gocron.Locker
|
||||
if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1)); err != nil {
|
||||
if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1), redislock.WithExpiry(30*time.Second), redislock.WithRetryDelay(5*time.Second)); err != nil {
|
||||
zap.L().Sugar().Fatalf("Cannot set up REDIS locker. Reason: %s", err.Error())
|
||||
}
|
||||
|
||||
|
@ -111,7 +111,6 @@ func (s *taskService) stop() {
|
|||
if err := s.scheduler.Shutdown(); err != nil {
|
||||
zap.L().Sugar().Warnf("Cannot shut down scheduler. Reason: %v", err)
|
||||
}
|
||||
s.lockService.stop()
|
||||
zap.L().Info("Stopped all periodic tasks")
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue