upda/util/locker_memory.go

163 lines
3.9 KiB
Go
Raw Permalink Normal View History

package util
import (
"sync"
"time"
)
import (
"errors"
"sync/atomic"
)
const (
MemoryLockerDefaultExpiration time.Duration = 0
MemoryLockerNoExpiration = -1
)
// ErrorNoSuchLock is returned when the requested lock does not exist
var ErrorNoSuchLock = errors.New("no such lock")
// InMemoryLockRegistry provides a locking mechanism based on the passed in reference name
type InMemoryLockRegistry struct {
mu sync.Mutex
locks map[string]*lockCtr
defaultExpiry int64
}
// lockCtr is used by InMemoryLockRegistry to represent a lock with a given name.
type lockCtr struct {
mu sync.Mutex
// waiters is the number of waiters waiting to acquire the lock
// this is int32 instead of uint32, so we can add `-1` in `dec()`
waiters int32
// expires is the time when
expires int64
}
// inc increments the number of waiters waiting for the lock
func (l *lockCtr) inc() {
atomic.AddInt32(&l.waiters, 1)
}
// dec decrements the number of waiters waiting on the lock
func (l *lockCtr) dec() {
atomic.AddInt32(&l.waiters, -1)
}
// count gets the current number of waiters
func (l *lockCtr) count() int32 {
return atomic.LoadInt32(&l.waiters)
}
// Lock locks the mutex
func (l *lockCtr) Lock() {
l.mu.Lock()
}
// Unlock unlocks the mutex
func (l *lockCtr) Unlock() {
l.mu.Unlock()
}
// NewInMemoryLockRegistry creates a new InMemoryLockRegistry
func NewInMemoryLockRegistry() *InMemoryLockRegistry {
return &InMemoryLockRegistry{
defaultExpiry: MemoryLockerNoExpiration,
locks: make(map[string]*lockCtr),
}
}
// Clear clears all locks by initializing a new map
func (r *InMemoryLockRegistry) Clear() {
r.locks = make(map[string]*lockCtr)
}
// Exists exists a lock by name
func (r *InMemoryLockRegistry) Exists(name string) bool {
r.deleteExpired()
r.mu.Lock()
_, exists := r.locks[name]
r.mu.Unlock()
return exists
}
// Lock locks a mutex with the given name and no expiration. If it doesn't exist, one is created.
func (r *InMemoryLockRegistry) Lock(name string) {
r.LockWithTTL(name, MemoryLockerDefaultExpiration)
}
// LockWithTTL locks a mutex with the given name and duration. If it doesn't exist, one is created. If duration is greater than 0, expiration is added.
func (r *InMemoryLockRegistry) LockWithTTL(name string, duration time.Duration) {
r.deleteExpired()
r.mu.Lock()
if r.locks == nil {
r.locks = make(map[string]*lockCtr)
}
nameLock, exists := r.locks[name]
if !exists {
e := r.defaultExpiry
if duration > 0 {
e = time.Now().Add(duration).UnixNano()
}
nameLock = &lockCtr{expires: e}
r.locks[name] = nameLock
}
// increment the nameLock waiters while inside the main mutex
// this makes sure that the lock isn't deleted if `Lock` and `Unlock` are called concurrently
nameLock.inc()
r.mu.Unlock()
// Lock the nameLock outside the main mutex, so we don't block other operations
// once locked then we can decrement the number of waiters for this lock
nameLock.Lock()
nameLock.dec()
}
// Unlock unlocks the mutex with the given name
// If the given lock is not being waited on by any other callers, it is deleted
func (r *InMemoryLockRegistry) Unlock(name string) error {
r.deleteExpired()
r.mu.Lock()
nameLock, exists := r.locks[name]
if !exists {
r.mu.Unlock()
return ErrorNoSuchLock
}
if nameLock.count() == 0 {
delete(r.locks, name)
}
nameLock.Unlock()
r.mu.Unlock()
return nil
}
// deleteExpired deletes expired entries if their expiration value is greater than 0 (expiration enabled) and it expired. This is a costly operation and is guarded by the global registry mutex.
func (r *InMemoryLockRegistry) deleteExpired() {
now := time.Now().UnixNano()
r.mu.Lock()
for k, v := range r.locks {
if v.expires > 0 && now > v.expires {
nameLock, exists := r.locks[k]
if !exists {
continue
}
if nameLock.count() == 0 {
delete(r.locks, k)
}
nameLock.Unlock()
}
}
r.mu.Unlock()
}