feature(locking): add proper locking and overhaul existing locking service (#34)
All checks were successful
/ build (push) Successful in 3m11s
All checks were successful
/ build (push) Successful in 3m11s
Reviewed-on: #34 Co-authored-by: Varakh <varakh@varakh.de> Co-committed-by: Varakh <varakh@varakh.de>
This commit is contained in:
parent
95074b2a86
commit
165b992629
10 changed files with 415 additions and 68 deletions
28
README.md
28
README.md
|
@ -37,6 +37,34 @@ LOGGING_LEVEL=debug
|
||||||
|
|
||||||
Please look into the `_doc/` folder for [OpenAPI specification](./_doc/api.yaml) and a Postman Collection.
|
Please look into the `_doc/` folder for [OpenAPI specification](./_doc/api.yaml) and a Postman Collection.
|
||||||
|
|
||||||
|
### Lock service
|
||||||
|
|
||||||
|
The `lockService` can be used to lock resources. This works in-memory and also in a distributed fashion with REDIS.
|
||||||
|
|
||||||
|
Ensure to provide proper locking options when using, although in-memory ignores those.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
# invoked from an endpoint
|
||||||
|
context := c.Request.Context()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var lock appLock
|
||||||
|
|
||||||
|
if lock, err = h.lockService.lockWithOptions(context, "TEST-LOCK", withAppLockOptionExpiry(5*time.Minute), withAppLockOptionInfiniteRetries(), withAppLockOptionRetryDelay(5*time.Second)); err != nil {
|
||||||
|
_ = c.AbortWithError(errToHttpStatus(err), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
# defer to avoid leakage
|
||||||
|
defer func(lock appLock) {
|
||||||
|
_ = lock.unlock(context)
|
||||||
|
}(lock)
|
||||||
|
|
||||||
|
# simulate long running task
|
||||||
|
time.Sleep(20 * time.Second)
|
||||||
|
```
|
||||||
|
|
||||||
### Getting started
|
### Getting started
|
||||||
|
|
||||||
1. Run `make clean dependencies` to fetch dependencies
|
1. Run `make clean dependencies` to fetch dependencies
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -12,6 +12,7 @@ require (
|
||||||
github.com/go-co-op/gocron-redis-lock/v2 v2.0.1
|
github.com/go-co-op/gocron-redis-lock/v2 v2.0.1
|
||||||
github.com/go-co-op/gocron/v2 v2.5.0
|
github.com/go-co-op/gocron/v2 v2.5.0
|
||||||
github.com/go-playground/validator/v10 v10.20.0
|
github.com/go-playground/validator/v10 v10.20.0
|
||||||
|
github.com/go-redsync/redsync/v4 v4.11.0
|
||||||
github.com/go-resty/resty/v2 v2.13.1
|
github.com/go-resty/resty/v2 v2.13.1
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/redis/go-redis/v9 v9.5.1
|
github.com/redis/go-redis/v9 v9.5.1
|
||||||
|
@ -39,7 +40,6 @@ require (
|
||||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||||
github.com/go-playground/locales v0.14.1 // indirect
|
github.com/go-playground/locales v0.14.1 // indirect
|
||||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||||
github.com/go-redsync/redsync/v4 v4.11.0 // indirect
|
|
||||||
github.com/goccy/go-json v0.10.2 // indirect
|
github.com/goccy/go-json v0.10.2 // indirect
|
||||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||||
|
|
|
@ -49,7 +49,18 @@ func Start() {
|
||||||
actionRepo := newActionDbRepo(env.db)
|
actionRepo := newActionDbRepo(env.db)
|
||||||
actionInvocationRepo := newActionInvocationDbRepo(env.db)
|
actionInvocationRepo := newActionInvocationDbRepo(env.db)
|
||||||
|
|
||||||
lockService := newLockMemService()
|
var lockService lockService
|
||||||
|
|
||||||
|
if env.lockConfig.redisEnabled {
|
||||||
|
var err error
|
||||||
|
lockService, err = newLockRedisService(env.lockConfig)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Fatal("Failed to create lock service", zap.Error(err))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
lockService = newLockMemService()
|
||||||
|
}
|
||||||
|
|
||||||
eventService := newEventService(eventRepo)
|
eventService := newEventService(eventRepo)
|
||||||
updateService := newUpdateService(updateRepo, eventService)
|
updateService := newUpdateService(updateRepo, eventService)
|
||||||
|
|
|
@ -1,9 +1,65 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// lockService provides methods for locking resources, behavior depends on underlying implementation
|
||||||
type lockService interface {
|
type lockService interface {
|
||||||
init() error
|
// lock locks a resource applying default options (varies for implementations)
|
||||||
tryLock(resource string) error
|
lock(ctx context.Context, resource string) (appLock, error)
|
||||||
release(resource string) error
|
|
||||||
exists(resource string) bool
|
// lockWithOptions locks a resource with given options, not all options are applied (varies for implementations)
|
||||||
stop()
|
lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type appLock interface {
|
||||||
|
// unlock unlocks a lock
|
||||||
|
unlock(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type appLockOption interface {
|
||||||
|
apply(l *appLockOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
type appLockOptionFunc func(o *appLockOptions)
|
||||||
|
|
||||||
|
func (f appLockOptionFunc) apply(o *appLockOptions) {
|
||||||
|
f(o)
|
||||||
|
}
|
||||||
|
|
||||||
|
type appLockOptions struct {
|
||||||
|
expiry *time.Duration
|
||||||
|
retryDelay *time.Duration
|
||||||
|
maxRetries *int
|
||||||
|
}
|
||||||
|
|
||||||
|
func withAppLockOptionExpiry(expiry time.Duration) appLockOption {
|
||||||
|
return appLockOptionFunc(func(o *appLockOptions) {
|
||||||
|
o.expiry = &expiry
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func withAppLockOptionRetries(retries int) appLockOption {
|
||||||
|
return appLockOptionFunc(func(o *appLockOptions) {
|
||||||
|
o.maxRetries = &retries
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
appLockOptionMaxRetries = math.MaxInt32
|
||||||
|
)
|
||||||
|
|
||||||
|
func withAppLockOptionInfiniteRetries() appLockOption {
|
||||||
|
return appLockOptionFunc(func(o *appLockOptions) {
|
||||||
|
o.maxRetries = &appLockOptionMaxRetries
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func withAppLockOptionRetryDelay(retryDelay time.Duration) appLockOption {
|
||||||
|
return appLockOptionFunc(func(o *appLockOptions) {
|
||||||
|
o.retryDelay = &retryDelay
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,52 +1,75 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"git.myservermanager.com/varakh/upda/util"
|
"git.myservermanager.com/varakh/upda/util"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type lockMemService struct {
|
type lockMemService struct {
|
||||||
registry *util.InMemoryLockRegistry
|
registry *util.InMemoryLockRegistry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errLockMemNotReleased = newServiceError(Conflict, errors.New("lock service: could not release lock"))
|
||||||
|
)
|
||||||
|
|
||||||
func newLockMemService() lockService {
|
func newLockMemService() lockService {
|
||||||
|
zap.L().Info("Initialized in-memory locking service")
|
||||||
return &lockMemService{registry: util.NewInMemoryLockRegistry()}
|
return &lockMemService{registry: util.NewInMemoryLockRegistry()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *lockMemService) init() error {
|
// lock locks a given resource without any options (default expiration)
|
||||||
zap.L().Info("Initialized in-memory locking service")
|
func (s *lockMemService) lock(ctx context.Context, resource string) (appLock, error) {
|
||||||
return nil
|
return s.lockWithOptions(ctx, resource, withAppLockOptionExpiry(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *lockMemService) tryLock(resource string) error {
|
// lockWithOptions locks a given resource, only TTL as option is supported
|
||||||
|
func (s *lockMemService) lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error) {
|
||||||
if resource == "" {
|
if resource == "" {
|
||||||
return errorValidationNotBlank
|
return nil, errorValidationNotBlank
|
||||||
|
}
|
||||||
|
|
||||||
|
var expiration time.Duration = 0
|
||||||
|
if options != nil {
|
||||||
|
lockOptions := &appLockOptions{}
|
||||||
|
for _, o := range options {
|
||||||
|
o.apply(lockOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lockOptions.expiry != nil {
|
||||||
|
expiration = *lockOptions.expiry
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
zap.L().Sugar().Debugf("Trying to lock '%s'", resource)
|
zap.L().Sugar().Debugf("Trying to lock '%s'", resource)
|
||||||
s.registry.Lock(resource)
|
|
||||||
|
s.registry.LockWithTTL(resource, expiration)
|
||||||
|
|
||||||
zap.L().Sugar().Debugf("Locked '%s'", resource)
|
zap.L().Sugar().Debugf("Locked '%s'", resource)
|
||||||
|
|
||||||
return nil
|
l := &inMemoryLock{
|
||||||
}
|
registry: s.registry,
|
||||||
|
resource: resource,
|
||||||
func (s *lockMemService) release(resource string) error {
|
|
||||||
if resource == "" {
|
|
||||||
return errorValidationNotBlank
|
|
||||||
}
|
}
|
||||||
|
|
||||||
zap.L().Sugar().Debugf("Releasing lock '%s'", resource)
|
return l, nil
|
||||||
err := s.registry.Unlock(resource)
|
|
||||||
zap.L().Sugar().Debugf("Released lock '%s'", resource)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *lockMemService) exists(resource string) bool {
|
var _ appLock = (*inMemoryLock)(nil)
|
||||||
return s.registry.Exists(resource)
|
|
||||||
|
type inMemoryLock struct {
|
||||||
|
registry *util.InMemoryLockRegistry
|
||||||
|
resource string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *lockMemService) stop() {
|
func (r inMemoryLock) unlock(ctx context.Context) error {
|
||||||
zap.L().Info("Clearing in-memory locking service")
|
zap.L().Sugar().Debugf("Unlocking '%s'", r.resource)
|
||||||
s.registry.Clear()
|
|
||||||
|
if err := r.registry.Unlock(r.resource); err != nil {
|
||||||
|
return errLockMemNotReleased
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
29
server/service_lock_mem_test.go
Normal file
29
server/service_lock_mem_test.go
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testLockName = "test_lock"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLockExpiresAndCannotBeReleased(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
|
||||||
|
s := newLockMemService()
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
lock, lockErr := s.lockWithOptions(ctx, testLockName, withAppLockOptionExpiry(250*time.Millisecond))
|
||||||
|
a.Nil(lockErr)
|
||||||
|
a.NotNil(lock)
|
||||||
|
|
||||||
|
time.Sleep(251 * time.Millisecond)
|
||||||
|
|
||||||
|
unlockErr := lock.unlock(ctx)
|
||||||
|
a.NotNil(unlockErr)
|
||||||
|
a.ErrorContains(unlockErr, "could not release lock")
|
||||||
|
}
|
109
server/service_lock_redis.go
Normal file
109
server/service_lock_redis.go
Normal file
|
@ -0,0 +1,109 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/go-redsync/redsync/v4"
|
||||||
|
redsyncgoredis "github.com/go-redsync/redsync/v4/redis/goredis/v9"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type lockRedisService struct {
|
||||||
|
rs *redsync.Redsync
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errLockRedisNotObtained = newServiceError(Conflict, errors.New("lock service: could not obtain lock"))
|
||||||
|
errLockRedisNotReleased = newServiceError(Conflict, errors.New("lock service: could not release lock"))
|
||||||
|
)
|
||||||
|
|
||||||
|
func newLockRedisService(lc *lockConfig) (lockService, error) {
|
||||||
|
zap.L().Info("Initialized REDIS locking service")
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var redisOptions *redis.Options
|
||||||
|
redisOptions, err = redis.ParseURL(lc.redisUrl)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Sugar().Fatalf("Cannot parse REDIS URL '%s' to set up locking. Reason: %s", lc.redisUrl, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
c := redis.NewClient(redisOptions)
|
||||||
|
if err = c.Ping(context.Background()).Err(); err != nil {
|
||||||
|
return nil, newServiceError(General, fmt.Errorf("lock service: failed to connect to REDIS. Reason: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := redsyncgoredis.NewPool(c)
|
||||||
|
rs := redsync.New(pool)
|
||||||
|
|
||||||
|
return &lockRedisService{rs: rs}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// lock locks a given resource without any options
|
||||||
|
func (s *lockRedisService) lock(ctx context.Context, resource string) (appLock, error) {
|
||||||
|
return s.lockWithOptions(ctx, resource, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// lockWithOptions locks a given resource considering all options
|
||||||
|
func (s *lockRedisService) lockWithOptions(ctx context.Context, resource string, options ...appLockOption) (appLock, error) {
|
||||||
|
if resource == "" {
|
||||||
|
return nil, errorValidationNotBlank
|
||||||
|
}
|
||||||
|
|
||||||
|
var rsOptions []redsync.Option
|
||||||
|
|
||||||
|
if options != nil {
|
||||||
|
lockOptions := &appLockOptions{}
|
||||||
|
for _, o := range options {
|
||||||
|
o.apply(lockOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lockOptions.expiry != nil {
|
||||||
|
rsOptions = append(rsOptions, redsync.WithExpiry(*lockOptions.expiry))
|
||||||
|
}
|
||||||
|
if lockOptions.maxRetries != nil {
|
||||||
|
rsOptions = append(rsOptions, redsync.WithTries(*lockOptions.maxRetries))
|
||||||
|
}
|
||||||
|
if lockOptions.retryDelay != nil {
|
||||||
|
rsOptions = append(rsOptions, redsync.WithRetryDelay(*lockOptions.retryDelay))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mu := s.rs.NewMutex(resource, rsOptions...)
|
||||||
|
|
||||||
|
zap.L().Sugar().Debugf("Trying to lock '%s'", resource)
|
||||||
|
|
||||||
|
if err := mu.LockContext(ctx); err != nil {
|
||||||
|
return nil, errLockRedisNotObtained
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Sugar().Debugf("Locked '%s'", resource)
|
||||||
|
|
||||||
|
l := &redisLock{
|
||||||
|
mu: mu,
|
||||||
|
}
|
||||||
|
|
||||||
|
return l, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ appLock = (*redisLock)(nil)
|
||||||
|
|
||||||
|
type redisLock struct {
|
||||||
|
mu *redsync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r redisLock) unlock(ctx context.Context) error {
|
||||||
|
zap.L().Sugar().Debugf("Unlocking '%s'", r.mu.Name())
|
||||||
|
|
||||||
|
unlocked, err := r.mu.UnlockContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errLockRedisNotReleased
|
||||||
|
}
|
||||||
|
if !unlocked {
|
||||||
|
return errLockRedisNotReleased
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -26,12 +26,12 @@ type taskService struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
JobUpdatesCleanStale = "UPDATES_CLEAN_STALE"
|
jobUpdatesCleanStale = "UPDATES_CLEAN_STALE"
|
||||||
JobEventsCleanStale = "EVENTS_CLEAN_STALE"
|
jobEventsCleanStale = "EVENTS_CLEAN_STALE"
|
||||||
JobActionsEnqueue = "ACTIONS_ENQUEUE"
|
jobActionsEnqueue = "ACTIONS_ENQUEUE"
|
||||||
JobActionsInvoke = "ACTIONS_INVOKE"
|
jobActionsInvoke = "ACTIONS_INVOKE"
|
||||||
JobActionsCleanStale = "ACTIONS_CLEAN_STALE"
|
jobActionsCleanStale = "ACTIONS_CLEAN_STALE"
|
||||||
JobPrometheusRefresh = "PROMETHEUS_REFRESH"
|
jobPrometheusRefresh = "PROMETHEUS_REFRESH"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -69,7 +69,7 @@ func newTaskService(u *updateService, e *eventService, w *webhookService, a *act
|
||||||
redisClient := redis.NewClient(redisOptions)
|
redisClient := redis.NewClient(redisOptions)
|
||||||
|
|
||||||
var locker gocron.Locker
|
var locker gocron.Locker
|
||||||
if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1)); err != nil {
|
if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1), redislock.WithExpiry(30*time.Second), redislock.WithRetryDelay(5*time.Second)); err != nil {
|
||||||
zap.L().Sugar().Fatalf("Cannot set up REDIS locker. Reason: %s", err.Error())
|
zap.L().Sugar().Fatalf("Cannot set up REDIS locker. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +111,6 @@ func (s *taskService) stop() {
|
||||||
if err := s.scheduler.Shutdown(); err != nil {
|
if err := s.scheduler.Shutdown(); err != nil {
|
||||||
zap.L().Sugar().Warnf("Cannot shut down scheduler. Reason: %v", err)
|
zap.L().Sugar().Warnf("Cannot shut down scheduler. Reason: %v", err)
|
||||||
}
|
}
|
||||||
s.lockService.stop()
|
|
||||||
zap.L().Info("Stopped all periodic tasks")
|
zap.L().Info("Stopped all periodic tasks")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,7 +144,7 @@ func (s *taskService) configureCleanupStaleUpdatesTask() {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.updateCleanStaleInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.updateCleanStaleInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(JobUpdatesCleanStale)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobUpdatesCleanStale)); err != nil {
|
||||||
zap.L().Sugar().Fatalf("Could not create task for cleaning stale updates. Reason: %s", err.Error())
|
zap.L().Sugar().Fatalf("Could not create task for cleaning stale updates. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -175,7 +174,7 @@ func (s *taskService) configureCleanupStaleEventsTask() {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.eventCleanStaleInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.eventCleanStaleInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(JobEventsCleanStale)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobEventsCleanStale)); err != nil {
|
||||||
zap.L().Sugar().Fatalf("Could not create task for cleaning stale events. Reason: %s", err.Error())
|
zap.L().Sugar().Fatalf("Could not create task for cleaning stale events. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -192,7 +191,7 @@ func (s *taskService) configureActionsEnqueueTask() {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.actionsEnqueueInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.actionsEnqueueInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(JobActionsEnqueue)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsEnqueue)); err != nil {
|
||||||
zap.L().Sugar().Fatalf("Could not create task for enqueueing actions. Reason: %s", err.Error())
|
zap.L().Sugar().Fatalf("Could not create task for enqueueing actions. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -209,7 +208,7 @@ func (s *taskService) configureActionsInvokeTask() {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.actionsInvokeInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.actionsInvokeInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(JobActionsInvoke)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsInvoke)); err != nil {
|
||||||
zap.L().Sugar().Fatalf("Could not create task for invoking actions. Reason: %s", err.Error())
|
zap.L().Sugar().Fatalf("Could not create task for invoking actions. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -246,7 +245,7 @@ func (s *taskService) configureCleanupStaleActionsTask() {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.actionsCleanStaleInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.actionsCleanStaleInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(JobActionsCleanStale)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsCleanStale)); err != nil {
|
||||||
zap.L().Sugar().Fatalf("Could not create task for cleaning stale actions. Reason: %s", err.Error())
|
zap.L().Sugar().Fatalf("Could not create task for cleaning stale actions. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -310,7 +309,7 @@ func (s *taskService) configurePrometheusRefreshTask() {
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.prometheusRefreshInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.prometheusRefreshInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(JobPrometheusRefresh)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobPrometheusRefresh)); err != nil {
|
||||||
zap.L().Sugar().Fatalf("Could not create task for refreshing prometheus. Reason: %s", err.Error())
|
zap.L().Sugar().Fatalf("Could not create task for refreshing prometheus. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,19 +1,28 @@
|
||||||
package util
|
package util
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MemoryLockerDefaultExpiration time.Duration = 0
|
||||||
|
MemoryLockerNoExpiration = -1
|
||||||
|
)
|
||||||
|
|
||||||
// ErrorNoSuchLock is returned when the requested lock does not exist
|
// ErrorNoSuchLock is returned when the requested lock does not exist
|
||||||
var ErrorNoSuchLock = errors.New("no such lock")
|
var ErrorNoSuchLock = errors.New("no such lock")
|
||||||
|
|
||||||
// InMemoryLockRegistry provides a locking mechanism based on the passed in reference name
|
// InMemoryLockRegistry provides a locking mechanism based on the passed in reference name
|
||||||
type InMemoryLockRegistry struct {
|
type InMemoryLockRegistry struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
locks map[string]*lockCtr
|
locks map[string]*lockCtr
|
||||||
|
defaultExpiry int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// lockCtr is used by InMemoryLockRegistry to represent a lock with a given name.
|
// lockCtr is used by InMemoryLockRegistry to represent a lock with a given name.
|
||||||
|
@ -22,6 +31,8 @@ type lockCtr struct {
|
||||||
// waiters is the number of waiters waiting to acquire the lock
|
// waiters is the number of waiters waiting to acquire the lock
|
||||||
// this is int32 instead of uint32, so we can add `-1` in `dec()`
|
// this is int32 instead of uint32, so we can add `-1` in `dec()`
|
||||||
waiters int32
|
waiters int32
|
||||||
|
// expires is the time when
|
||||||
|
expires int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// inc increments the number of waiters waiting for the lock
|
// inc increments the number of waiters waiting for the lock
|
||||||
|
@ -52,40 +63,55 @@ func (l *lockCtr) Unlock() {
|
||||||
// NewInMemoryLockRegistry creates a new InMemoryLockRegistry
|
// NewInMemoryLockRegistry creates a new InMemoryLockRegistry
|
||||||
func NewInMemoryLockRegistry() *InMemoryLockRegistry {
|
func NewInMemoryLockRegistry() *InMemoryLockRegistry {
|
||||||
return &InMemoryLockRegistry{
|
return &InMemoryLockRegistry{
|
||||||
locks: make(map[string]*lockCtr),
|
defaultExpiry: MemoryLockerNoExpiration,
|
||||||
|
locks: make(map[string]*lockCtr),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear clears all locks by initializing a new map
|
// Clear clears all locks by initializing a new map
|
||||||
func (l *InMemoryLockRegistry) Clear() {
|
func (r *InMemoryLockRegistry) Clear() {
|
||||||
l.locks = make(map[string]*lockCtr)
|
r.locks = make(map[string]*lockCtr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exists exists a lock by name
|
// Exists exists a lock by name
|
||||||
func (l *InMemoryLockRegistry) Exists(name string) bool {
|
func (r *InMemoryLockRegistry) Exists(name string) bool {
|
||||||
l.mu.Lock()
|
r.deleteExpired()
|
||||||
_, exists := l.locks[name]
|
r.mu.Lock()
|
||||||
l.mu.Unlock()
|
_, exists := r.locks[name]
|
||||||
|
r.mu.Unlock()
|
||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock locks a mutex with the given name. If it doesn't exist, one is created
|
// Lock locks a mutex with the given name and no expiration. If it doesn't exist, one is created.
|
||||||
func (l *InMemoryLockRegistry) Lock(name string) {
|
func (r *InMemoryLockRegistry) Lock(name string) {
|
||||||
l.mu.Lock()
|
r.LockWithTTL(name, MemoryLockerDefaultExpiration)
|
||||||
if l.locks == nil {
|
}
|
||||||
l.locks = make(map[string]*lockCtr)
|
|
||||||
|
// LockWithTTL locks a mutex with the given name and duration. If it doesn't exist, one is created. If duration is greater than 0, expiration is added.
|
||||||
|
func (r *InMemoryLockRegistry) LockWithTTL(name string, duration time.Duration) {
|
||||||
|
r.deleteExpired()
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
if r.locks == nil {
|
||||||
|
r.locks = make(map[string]*lockCtr)
|
||||||
}
|
}
|
||||||
|
|
||||||
nameLock, exists := l.locks[name]
|
nameLock, exists := r.locks[name]
|
||||||
if !exists {
|
if !exists {
|
||||||
nameLock = &lockCtr{}
|
e := r.defaultExpiry
|
||||||
l.locks[name] = nameLock
|
if duration > 0 {
|
||||||
|
e = time.Now().Add(duration).UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
|
nameLock = &lockCtr{expires: e}
|
||||||
|
|
||||||
|
r.locks[name] = nameLock
|
||||||
}
|
}
|
||||||
|
|
||||||
// increment the nameLock waiters while inside the main mutex
|
// increment the nameLock waiters while inside the main mutex
|
||||||
// this makes sure that the lock isn't deleted if `Lock` and `Unlock` are called concurrently
|
// this makes sure that the lock isn't deleted if `Lock` and `Unlock` are called concurrently
|
||||||
nameLock.inc()
|
nameLock.inc()
|
||||||
l.mu.Unlock()
|
r.mu.Unlock()
|
||||||
|
|
||||||
// Lock the nameLock outside the main mutex, so we don't block other operations
|
// Lock the nameLock outside the main mutex, so we don't block other operations
|
||||||
// once locked then we can decrement the number of waiters for this lock
|
// once locked then we can decrement the number of waiters for this lock
|
||||||
|
@ -95,19 +121,42 @@ func (l *InMemoryLockRegistry) Lock(name string) {
|
||||||
|
|
||||||
// Unlock unlocks the mutex with the given name
|
// Unlock unlocks the mutex with the given name
|
||||||
// If the given lock is not being waited on by any other callers, it is deleted
|
// If the given lock is not being waited on by any other callers, it is deleted
|
||||||
func (l *InMemoryLockRegistry) Unlock(name string) error {
|
func (r *InMemoryLockRegistry) Unlock(name string) error {
|
||||||
l.mu.Lock()
|
r.deleteExpired()
|
||||||
nameLock, exists := l.locks[name]
|
|
||||||
|
r.mu.Lock()
|
||||||
|
nameLock, exists := r.locks[name]
|
||||||
if !exists {
|
if !exists {
|
||||||
l.mu.Unlock()
|
r.mu.Unlock()
|
||||||
return ErrorNoSuchLock
|
return ErrorNoSuchLock
|
||||||
}
|
}
|
||||||
|
|
||||||
if nameLock.count() == 0 {
|
if nameLock.count() == 0 {
|
||||||
delete(l.locks, name)
|
delete(r.locks, name)
|
||||||
}
|
}
|
||||||
nameLock.Unlock()
|
nameLock.Unlock()
|
||||||
|
|
||||||
l.mu.Unlock()
|
r.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deleteExpired deletes expired entries if their expiration value is greater than 0 (expiration enabled) and it expired. This is a costly operation and is guarded by the global registry mutex.
|
||||||
|
func (r *InMemoryLockRegistry) deleteExpired() {
|
||||||
|
now := time.Now().UnixNano()
|
||||||
|
r.mu.Lock()
|
||||||
|
for k, v := range r.locks {
|
||||||
|
if v.expires > 0 && now > v.expires {
|
||||||
|
nameLock, exists := r.locks[k]
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if nameLock.count() == 0 {
|
||||||
|
delete(r.locks, k)
|
||||||
|
}
|
||||||
|
nameLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
43
util/locker_memory_test.go
Normal file
43
util/locker_memory_test.go
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testLockName = "test_lock"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLockExpires(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
|
||||||
|
r := NewInMemoryLockRegistry()
|
||||||
|
r.LockWithTTL(testLockName, 250*time.Millisecond)
|
||||||
|
a.True(r.Exists(testLockName))
|
||||||
|
|
||||||
|
time.Sleep(251 * time.Millisecond)
|
||||||
|
a.False(r.Exists(testLockName))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLockNeverExpires(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
|
||||||
|
r := NewInMemoryLockRegistry()
|
||||||
|
r.Lock(testLockName)
|
||||||
|
a.True(r.Exists(testLockName))
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
a.True(r.Exists(testLockName))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLockLocksAndUnlocks(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
|
||||||
|
r := NewInMemoryLockRegistry()
|
||||||
|
r.LockWithTTL(testLockName, 250*time.Millisecond)
|
||||||
|
a.True(r.Exists(testLockName))
|
||||||
|
_ = r.Unlock(testLockName)
|
||||||
|
a.False(r.Exists(testLockName))
|
||||||
|
}
|
Loading…
Reference in a new issue