Minor refactor tackling typos, overhauling README, adding hints about useful resources to README, and avoid any panic/Fatalf from services and init calls #noissue
All checks were successful
/ build (push) Successful in 5m12s

This commit is contained in:
Varakh 2024-06-03 21:11:33 +02:00
parent 302d0b1ad4
commit 87684fd755
9 changed files with 218 additions and 170 deletions

View file

@ -12,15 +12,7 @@ Contributions are very welcome!
## Development & contribution ## Development & contribution
* Ensure to set the following environment variables for proper debug logs during development * Pay attention to `make checkstyle` (uses `go vet ./...`); pipeline fails if issues are detected.
```shell
DEVELOPMENT=true
LOGGING_ENCODING=console
LOGGING_LEVEL=debug
```
* Code guidelines
* Each entity has its own repository * Each entity has its own repository
* Each entity is only used in repository and service (otherwise, mapping happens) * Each entity is only used in repository and service (otherwise, mapping happens)
* Presenter layer is constructed from the entity, e.g., in REST responses and mapped * Presenter layer is constructed from the entity, e.g., in REST responses and mapped
@ -28,47 +20,29 @@ LOGGING_LEVEL=debug
* All log calls should be handled by `zap.L()` * All log calls should be handled by `zap.L()`
* Configuration is bootstrapped via separated `struct` types which are given to the service which need them * Configuration is bootstrapped via separated `struct` types which are given to the service which need them
* Error handling * Error handling
* Always throw an error with `NewServiceError` * Always throw an error with `NewServiceError` for repositories, services and handlers
* Always wrap the cause error with `fmt.Errorf` * Always throw an error wrapping the cause with `fmt.Errorf`
* Forward/bubble up the error directly, when original error is already a `NewServiceError` (most likely internal * Forward/bubble up the error directly, when original error is already a `NewServiceError` (most likely internal
calls) calls)
* Always abort handler chain with `AbortWithError` * Always abort handler chain with `AbortWithError`
* Utils can throw any error * Utils can throw any error
* Repositories, handlers and services should always properly return `error` including any `init`-like function (
Please look into the `_doc/` folder for [OpenAPI specification](./_doc/api.yaml) and a Postman Collection. best to avoid them and call in `newXXX`). **Do not abort with `Fatalf` or similar**
* `log.Fatalf` or `zap.L().Fatal` is allowed in `environment.go` or `app.go`
**Pay attention to `make checkstyle` (invoked `go vet ./...`) output as pipeline will fail if issues are detected.** * Look into the `_doc/` folder for [OpenAPI specification](./_doc/api.yaml) and a Postman Collection.
* Consider reading [Effective Go](https://go.dev/doc/effective_go)
### Lock service * Consider reading [100 Go Mistakes and How to Avoid Them](https://100go.co/)
The `lockService` can be used to lock resources. This works in-memory and also in a distributed fashion with REDIS.
Ensure to provide proper locking options when using, although in-memory ignores those.
Example:
```shell
# invoked from an endpoint
context := c.Request.Context()
var err error
var lock appLock
if lock, err = h.lockService.lockWithOptions(context, "TEST-LOCK", withAppLockOptionExpiry(5*time.Minute), withAppLockOptionInfiniteRetries(), withAppLockOptionRetryDelay(5*time.Second)); err != nil {
_ = c.AbortWithError(errToHttpStatus(err), err)
return
}
# defer to avoid leakage
defer func(lock appLock) {
_ = lock.unlock(context)
}(lock)
# simulate long running task
time.Sleep(20 * time.Second)
```
### Getting started ### Getting started
Ensure to set the following environment variables for proper debug logs during development
```shell
DEVELOPMENT=true
LOGGING_ENCODING=console
LOGGING_LEVEL=debug
```
1. Run `make clean dependencies` to fetch dependencies 1. Run `make clean dependencies` to fetch dependencies
2. Start `git.myservermanager.com/varakh/upda/cmd/server` (or `cli`) as Go application and ensure to have _required_ 2. Start `git.myservermanager.com/varakh/upda/cmd/server` (or `cli`) as Go application and ensure to have _required_
environment variables set environment variables set
@ -100,6 +74,34 @@ path.
For any `go` command you run, ensure that your `PATH` has the `gcc` binary and that you add `CGO_ENABLED=1` as For any `go` command you run, ensure that your `PATH` has the `gcc` binary and that you add `CGO_ENABLED=1` as
environment. environment.
### Using the `lockService` correctly
The `lockService` can be used to lock resources. This works in-memory and also in a distributed fashion with REDIS.
Ensure to provide proper locking options when using, although in-memory ignores those.
Example:
```shell
# invoked from an endpoint
context := c.Request.Context()
var err error
var lock appLock
if lock, err = h.lockService.lockWithOptions(context, "TEST-LOCK", withAppLockOptionExpiry(5*time.Minute), withAppLockOptionInfiniteRetries(), withAppLockOptionRetryDelay(5*time.Second)); err != nil {
_ = c.AbortWithError(errToHttpStatus(err), err)
return
}
# defer to avoid leakage
defer func(lock appLock) {
_ = lock.unlock(context)
}(lock)
# simulate long running task
time.Sleep(20 * time.Second)
```
### Release ### Release
Releases are handled by the SCM platform and pipeline. Creating a **new git tag**, creates a new release in the SCM Releases are handled by the SCM platform and pipeline. Creating a **new git tag**, creates a new release in the SCM

View file

@ -31,16 +31,20 @@ func Start() {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
} }
// app init (router, services, handlers)
router := gin.New() router := gin.New()
router.Use(ginzap.Ginzap(zap.L(), time.RFC3339, false)) router.Use(ginzap.Ginzap(zap.L(), time.RFC3339, false))
router.Use(ginzap.RecoveryWithZap(zap.L(), true)) router.Use(ginzap.RecoveryWithZap(zap.L(), true))
// metrics var err error
prometheusService := newPrometheusService(router, env.prometheusConfig)
ps := newPrometheusService(router, env.prometheusConfig)
if env.prometheusConfig.enabled { if env.prometheusConfig.enabled {
prometheusService.init() if err = ps.init(); err != nil {
router.Use(prometheusService.prometheus.Instrument()) zap.L().Sugar().Fatalf("Prometheus service init failed: %s", err.Error())
}
router.Use(ps.prometheus.Instrument())
} }
updateRepo := newUpdateDbRepo(env.db) updateRepo := newUpdateDbRepo(env.db)
@ -50,43 +54,51 @@ func Start() {
actionRepo := newActionDbRepo(env.db) actionRepo := newActionDbRepo(env.db)
actionInvocationRepo := newActionInvocationDbRepo(env.db) actionInvocationRepo := newActionInvocationDbRepo(env.db)
var lockService lockService var ls lockService
if env.lockConfig.redisEnabled { if env.lockConfig.redisEnabled {
var err error var e error
lockService, err = newLockRedisService(env.lockConfig) ls, e = newLockRedisService(env.lockConfig)
if err != nil { if err != nil {
zap.L().Fatal("Failed to create lock service", zap.Error(err)) zap.L().Fatal("Failed to create lock service", zap.Error(e))
} }
} else { } else {
lockService = newLockMemService() ls = newLockMemService()
} }
eventService := newEventService(eventRepo) es := newEventService(eventRepo)
updateService := newUpdateService(updateRepo, eventService) us := newUpdateService(updateRepo, es)
webhookService := newWebhookService(webhookRepo, env.webhookConfig) ws := newWebhookService(webhookRepo, env.webhookConfig)
webhookInvocationService := newWebhookInvocationService(webhookService, updateService, env.webhookConfig) wis := newWebhookInvocationService(ws, us, env.webhookConfig)
secretService := newSecretService(secretRepo) ss := newSecretService(secretRepo)
actionService := newActionService(actionRepo, eventService) as := newActionService(actionRepo, es)
actionInvocationService := newActionInvocationService(actionInvocationRepo, actionService, eventService, secretService) ais := newActionInvocationService(actionInvocationRepo, as, es, ss)
taskService := newTaskService(updateService, eventService, webhookService, actionService, actionInvocationService, lockService, prometheusService, env.appConfig, env.taskConfig, env.lockConfig, env.prometheusConfig) var ts *taskService
taskService.init()
taskService.start()
updateHandler := newUpdateHandler(updateService, env.appConfig) if ts, err = newTaskService(us, es, ws, as, ais, ls, ps, env.appConfig, env.taskConfig, env.lockConfig, env.prometheusConfig); err != nil {
webhookHandler := newWebhookHandler(webhookService) zap.L().Sugar().Fatalf("Task service creation failed: %v", err)
webhookInvocationHandler := newWebhookInvocationHandler(webhookInvocationService, webhookService) }
eventHandler := newEventHandler(eventService)
secretHandler := newSecretHandler(secretService)
actionHandler := newActionHandler(actionService)
actionInvocationHandler := newActionInvocationHandler(actionService, actionInvocationService)
infoHandler := newInfoHandler(env.appConfig) if err = ts.init(); err != nil {
healthHandler := newHealthHandler() zap.L().Sugar().Fatalf("Task service initialization failed: %v", err)
authHandler := newAuthHandler() }
ts.start()
uh := newUpdateHandler(us, env.appConfig)
wh := newWebhookHandler(ws)
wih := newWebhookInvocationHandler(wis, ws)
eh := newEventHandler(es)
sh := newSecretHandler(ss)
ah := newActionHandler(as)
aih := newActionInvocationHandler(as, ais)
ih := newInfoHandler(env.appConfig)
hh := newHealthHandler()
authH := newAuthHandler()
router.Use(middlewareAppName()) router.Use(middlewareAppName())
router.Use(middlewareAppVersion()) router.Use(middlewareAppVersion())
@ -102,10 +114,10 @@ func Start() {
})) }))
apiPublicGroup := router.Group("/api/v1") apiPublicGroup := router.Group("/api/v1")
apiPublicGroup.GET("/health", healthHandler.show) apiPublicGroup.GET("/health", hh.show)
apiPublicGroup.GET("/info", infoHandler.show) apiPublicGroup.GET("/info", ih.show)
apiPublicGroup.POST("/webhooks/:id", webhookInvocationHandler.execute) apiPublicGroup.POST("/webhooks/:id", wih.execute)
var authMethodHandler gin.HandlerFunc var authMethodHandler gin.HandlerFunc
@ -121,48 +133,47 @@ func Start() {
apiAuthGroup := router.Group("/api/v1", authMethodHandler) apiAuthGroup := router.Group("/api/v1", authMethodHandler)
apiAuthGroup.GET("/login", authHandler.login) apiAuthGroup.GET("/login", authH.login)
apiAuthGroup.GET("/updates", updateHandler.paginate) apiAuthGroup.GET("/updates", uh.paginate)
apiAuthGroup.GET("/updates/:id", updateHandler.get) apiAuthGroup.GET("/updates/:id", uh.get)
apiAuthGroup.PATCH("/updates/:id/state", updateHandler.updateState) apiAuthGroup.PATCH("/updates/:id/state", uh.updateState)
apiAuthGroup.DELETE("/updates/:id", updateHandler.delete) apiAuthGroup.DELETE("/updates/:id", uh.delete)
apiAuthGroup.GET("/webhooks", webhookHandler.paginate) apiAuthGroup.GET("/webhooks", wh.paginate)
apiAuthGroup.POST("/webhooks", webhookHandler.create) apiAuthGroup.POST("/webhooks", wh.create)
apiAuthGroup.GET("/webhooks/:id", webhookHandler.get) apiAuthGroup.GET("/webhooks/:id", wh.get)
apiAuthGroup.PATCH("/webhooks/:id/label", webhookHandler.updateLabel) apiAuthGroup.PATCH("/webhooks/:id/label", wh.updateLabel)
apiAuthGroup.PATCH("/webhooks/:id/ignore-host", webhookHandler.updateIgnoreHost) apiAuthGroup.PATCH("/webhooks/:id/ignore-host", wh.updateIgnoreHost)
apiAuthGroup.DELETE("/webhooks/:id", webhookHandler.delete) apiAuthGroup.DELETE("/webhooks/:id", wh.delete)
apiAuthGroup.GET("/events", eventHandler.window) apiAuthGroup.GET("/events", eh.window)
apiAuthGroup.GET("/events/:id", eventHandler.get) apiAuthGroup.GET("/events/:id", eh.get)
apiAuthGroup.DELETE("/events/:id", eventHandler.delete) apiAuthGroup.DELETE("/events/:id", eh.delete)
apiAuthGroup.GET("/secrets", secretHandler.getAll) apiAuthGroup.GET("/secrets", sh.getAll)
apiAuthGroup.GET("/secrets/:id", secretHandler.get) apiAuthGroup.GET("/secrets/:id", sh.get)
apiAuthGroup.POST("/secrets", secretHandler.create) apiAuthGroup.POST("/secrets", sh.create)
apiAuthGroup.PATCH("/secrets/:id/value", secretHandler.updateValue) apiAuthGroup.PATCH("/secrets/:id/value", sh.updateValue)
apiAuthGroup.DELETE("/secrets/:id", secretHandler.delete) apiAuthGroup.DELETE("/secrets/:id", sh.delete)
apiAuthGroup.GET("/actions", actionHandler.paginate) apiAuthGroup.GET("/actions", ah.paginate)
apiAuthGroup.POST("/actions", actionHandler.create) apiAuthGroup.POST("/actions", ah.create)
apiAuthGroup.GET("/actions/:id", actionHandler.get) apiAuthGroup.GET("/actions/:id", ah.get)
apiAuthGroup.PATCH("/actions/:id/label", actionHandler.updateLabel) apiAuthGroup.PATCH("/actions/:id/label", ah.updateLabel)
apiAuthGroup.PATCH("/actions/:id/match-event", actionHandler.updateMatchEvent) apiAuthGroup.PATCH("/actions/:id/match-event", ah.updateMatchEvent)
apiAuthGroup.PATCH("/actions/:id/match-host", actionHandler.updateMatchHost) apiAuthGroup.PATCH("/actions/:id/match-host", ah.updateMatchHost)
apiAuthGroup.PATCH("/actions/:id/match-application", actionHandler.updateMatchApplication) apiAuthGroup.PATCH("/actions/:id/match-application", ah.updateMatchApplication)
apiAuthGroup.PATCH("/actions/:id/match-provider", actionHandler.updateMatchProvider) apiAuthGroup.PATCH("/actions/:id/match-provider", ah.updateMatchProvider)
apiAuthGroup.PATCH("/actions/:id/payload", actionHandler.updatePayload) apiAuthGroup.PATCH("/actions/:id/payload", ah.updatePayload)
apiAuthGroup.PATCH("/actions/:id/enabled", actionHandler.updateEnabled) apiAuthGroup.PATCH("/actions/:id/enabled", ah.updateEnabled)
apiAuthGroup.DELETE("/actions/:id", actionHandler.delete) apiAuthGroup.DELETE("/actions/:id", ah.delete)
apiAuthGroup.POST("/actions/:id/test", actionInvocationHandler.test) apiAuthGroup.POST("/actions/:id/test", aih.test)
apiAuthGroup.GET("/action-invocations", actionInvocationHandler.paginate) apiAuthGroup.GET("/action-invocations", aih.paginate)
apiAuthGroup.GET("/action-invocations/:id", actionInvocationHandler.get) apiAuthGroup.GET("/action-invocations/:id", aih.get)
apiAuthGroup.DELETE("/action-invocations/:id", actionInvocationHandler.delete) apiAuthGroup.DELETE("/action-invocations/:id", aih.delete)
// start server
serverAddress := fmt.Sprintf("%s:%d", env.serverConfig.listen, env.serverConfig.port) serverAddress := fmt.Sprintf("%s:%d", env.serverConfig.listen, env.serverConfig.port)
srv := &http.Server{ srv := &http.Server{
Addr: serverAddress, Addr: serverAddress,
@ -170,16 +181,16 @@ func Start() {
} }
go func() { go func() {
var err error var e error
if env.serverConfig.tlsEnabled { if env.serverConfig.tlsEnabled {
err = srv.ListenAndServeTLS(env.serverConfig.tlsCertPath, env.serverConfig.tlsKeyPath) e = srv.ListenAndServeTLS(env.serverConfig.tlsCertPath, env.serverConfig.tlsKeyPath)
} else { } else {
err = srv.ListenAndServe() e = srv.ListenAndServe()
} }
if err != nil && !errors.Is(err, http.ErrServerClosed) { if e != nil && !errors.Is(e, http.ErrServerClosed) {
zap.L().Sugar().Fatalf("Application cannot be started: %v", err) zap.L().Sugar().Fatalf("Application cannot be started: %v", e)
} }
}() }()
@ -193,11 +204,11 @@ func Start() {
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit <-quit
zap.L().Info("Shutting down...") zap.L().Info("Shutting down...")
taskService.stop() ts.stop()
ctx, cancel := context.WithTimeout(context.Background(), env.serverConfig.timeout) ctx, cancel := context.WithTimeout(context.Background(), env.serverConfig.timeout)
defer cancel() defer cancel()
if err := srv.Shutdown(ctx); err != nil { if err = srv.Shutdown(ctx); err != nil {
zap.L().Sugar().Fatalf("Shutdown failed, exited directly: %v", err) zap.L().Sugar().Fatalf("Shutdown failed, exited directly: %v", err)
} }
// catching ctx.Done() for configured timeout // catching ctx.Done() for configured timeout

View file

@ -14,7 +14,7 @@ import (
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
) )
// JSONMap defined JSON data type, need to implements driver.Valuer, sql.Scanner interface // JSONMap defined JSON data type, need to implement driver.Valuer, sql.Scanner interface
type JSONMap map[string]interface { type JSONMap map[string]interface {
} }
@ -27,7 +27,7 @@ func (m JSONMap) Value() (driver.Value, error) {
return string(ba), err return string(ba), err
} }
// Scan scan value into Jsonb, implements sql.Scanner interface // Scan value into JSONB, implements sql.Scanner interface
func (m *JSONMap) Scan(val interface{}) error { func (m *JSONMap) Scan(val interface{}) error {
if val == nil { if val == nil {
*m = make(JSONMap) *m = make(JSONMap)

View file

@ -165,7 +165,9 @@ func bootstrapEnvironment() *Environment {
} }
zapLogger := zap.Must(zapConfig.Build()) zapLogger := zap.Must(zapConfig.Build())
defer zapLogger.Sync() defer func(zapLogger *zap.Logger) {
_ = zapLogger.Sync()
}(zapLogger)
zap.ReplaceGlobals(zapLogger) zap.ReplaceGlobals(zapLogger)
// assign defaults from given environment variables and validate // assign defaults from given environment variables and validate
@ -323,7 +325,9 @@ func bootstrapEnvironment() *Environment {
gormConfig := &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)} gormConfig := &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)}
if isDebug && isDevelopment { if isDebug && isDevelopment {
gormZapLogger := zap.Must(zapConfig.Build()) gormZapLogger := zap.Must(zapConfig.Build())
defer gormZapLogger.Sync() defer func(gormZapLogger *zap.Logger) {
_ = gormZapLogger.Sync()
}(gormZapLogger)
gormLogger := zapgorm2.New(gormZapLogger) gormLogger := zapgorm2.New(gormZapLogger)
gormConfig = &gorm.Config{Logger: gormLogger} gormConfig = &gorm.Config{Logger: gormLogger}
} }

View file

@ -17,7 +17,7 @@ var (
) )
func newLockMemService() lockService { func newLockMemService() lockService {
zap.L().Info("Initialized in-memory locking service") zap.L().Info("Initializing in-memory locking service")
return &lockMemService{registry: util.NewInMemoryLockRegistry()} return &lockMemService{registry: util.NewInMemoryLockRegistry()}
} }

View file

@ -20,19 +20,19 @@ var (
) )
func newLockRedisService(lc *lockConfig) (lockService, error) { func newLockRedisService(lc *lockConfig) (lockService, error) {
zap.L().Info("Initialized REDIS locking service") zap.L().Info("Initializing REDIS locking service")
var err error var err error
var redisOptions *redis.Options var redisOptions *redis.Options
redisOptions, err = redis.ParseURL(lc.redisUrl) redisOptions, err = redis.ParseURL(lc.redisUrl)
if err != nil { if err != nil {
zap.L().Sugar().Fatalf("Cannot parse REDIS URL '%s' to set up locking. Reason: %s", lc.redisUrl, err.Error()) return nil, fmt.Errorf("lock service: cannot parse REDIS URL '%s' to set up locking: %s", lc.redisUrl, err)
} }
c := redis.NewClient(redisOptions) c := redis.NewClient(redisOptions)
if err = c.Ping(context.Background()).Err(); err != nil { if err = c.Ping(context.Background()).Err(); err != nil {
return nil, newServiceError(General, fmt.Errorf("lock service: failed to connect to REDIS. Reason: %w", err)) return nil, fmt.Errorf("lock service: failed to connect to REDIS: %w", err)
} }
pool := redsyncgoredis.NewPool(c) pool := redsyncgoredis.NewPool(c)

View file

@ -1,9 +1,9 @@
package server package server
import ( import (
"fmt"
"github.com/Depado/ginprom" "github.com/Depado/ginprom"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"go.uber.org/zap"
) )
type prometheusService struct { type prometheusService struct {
@ -42,9 +42,9 @@ func newPrometheusService(r *gin.Engine, c *prometheusConfig) *prometheusService
} }
} }
func (s *prometheusService) init() { func (s *prometheusService) init() error {
if !s.config.enabled { if !s.config.enabled {
return return nil
} }
var err error var err error
@ -59,8 +59,10 @@ func (s *prometheusService) init() {
err = s.registerGaugeNoLabels(metricActions, metricActionsHelp) err = s.registerGaugeNoLabels(metricActions, metricActionsHelp)
if err != nil { if err != nil {
zap.L().Sugar().Fatalf("Cannot initialize service. Reason: %v", err) return newServiceError(General, fmt.Errorf("cannot initialize service: %w", err))
} }
return nil
} }
func (s *prometheusService) registerGaugeNoLabels(name string, help string) error { func (s *prometheusService) registerGaugeNoLabels(name string, help string) error {

View file

@ -1,6 +1,7 @@
package server package server
import ( import (
"fmt"
"git.myservermanager.com/varakh/upda/api" "git.myservermanager.com/varakh/upda/api"
redislock "github.com/go-co-op/gocron-redis-lock/v2" redislock "github.com/go-co-op/gocron-redis-lock/v2"
"github.com/go-co-op/gocron/v2" "github.com/go-co-op/gocron/v2"
@ -38,11 +39,11 @@ var (
initialTasksStartDelay = time.Now().Add(10 * time.Second) initialTasksStartDelay = time.Now().Add(10 * time.Second)
) )
func newTaskService(u *updateService, e *eventService, w *webhookService, a *actionService, ai *actionInvocationService, l lockService, p *prometheusService, ac *appConfig, tc *taskConfig, lc *lockConfig, pc *prometheusConfig) *taskService { func newTaskService(u *updateService, e *eventService, w *webhookService, a *actionService, ai *actionInvocationService, l lockService, p *prometheusService, ac *appConfig, tc *taskConfig, lc *lockConfig, pc *prometheusConfig) (*taskService, error) {
location, err := time.LoadLocation(ac.timeZone) var err error
var location *time.Location
if err != nil { if location, err = time.LoadLocation(ac.timeZone); err != nil {
zap.L().Sugar().Fatalf("Could not initialize correct timezone for scheduler. Reason: %s", err.Error()) return nil, fmt.Errorf("could not initialize correct timezone for scheduler: %s", err)
} }
// global job options // global job options
@ -64,13 +65,13 @@ func newTaskService(u *updateService, e *eventService, w *webhookService, a *act
redisOptions, err = redis.ParseURL(lc.redisUrl) redisOptions, err = redis.ParseURL(lc.redisUrl)
if err != nil { if err != nil {
zap.L().Sugar().Fatalf("Cannot parse REDIS URL '%s' to set up locking. Reason: %s", lc.redisUrl, err.Error()) return nil, fmt.Errorf("cannot parse REDIS URL '%s' to set up locking for scheduler: %s", lc.redisUrl, err)
} }
redisClient := redis.NewClient(redisOptions) redisClient := redis.NewClient(redisOptions)
var locker gocron.Locker var locker gocron.Locker
if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1), redislock.WithExpiry(30*time.Second), redislock.WithRetryDelay(5*time.Second)); err != nil { if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1), redislock.WithExpiry(30*time.Second), redislock.WithRetryDelay(5*time.Second)); err != nil {
zap.L().Sugar().Fatalf("Cannot set up REDIS locker. Reason: %s", err.Error()) return nil, fmt.Errorf("cannot set up REDIS locker for scheduler: %s", err)
} }
schedulerOptions = append(schedulerOptions, gocron.WithDistributedLocker(locker)) schedulerOptions = append(schedulerOptions, gocron.WithDistributedLocker(locker))
@ -91,16 +92,30 @@ func newTaskService(u *updateService, e *eventService, w *webhookService, a *act
lockConfig: lc, lockConfig: lc,
prometheusConfig: pc, prometheusConfig: pc,
scheduler: scheduler, scheduler: scheduler,
} }, nil
} }
func (s *taskService) init() { func (s *taskService) init() error {
s.configureCleanupStaleUpdatesTask() if err := s.configureCleanupStaleUpdatesTask(); err != nil {
s.configureCleanupStaleEventsTask() return err
s.configureActionsEnqueueTask() }
s.configureActionsInvokeTask() if err := s.configureCleanupStaleEventsTask(); err != nil {
s.configureCleanupStaleActionsTask() return err
s.configurePrometheusRefreshTask() }
if err := s.configureActionsEnqueueTask(); err != nil {
return err
}
if err := s.configureActionsInvokeTask(); err != nil {
return err
}
if err := s.configureCleanupStaleActionsTask(); err != nil {
return err
}
if err := s.configurePrometheusRefreshTask(); err != nil {
return err
}
return nil
} }
func (s *taskService) stop() { func (s *taskService) stop() {
@ -119,9 +134,9 @@ func (s *taskService) start() {
zap.L().Sugar().Infof("Started %d periodic tasks", len(s.scheduler.Jobs())) zap.L().Sugar().Infof("Started %d periodic tasks", len(s.scheduler.Jobs()))
} }
func (s *taskService) configureCleanupStaleUpdatesTask() { func (s *taskService) configureCleanupStaleUpdatesTask() error {
if !s.taskConfig.updateCleanStaleEnabled { if !s.taskConfig.updateCleanStaleEnabled {
return return nil
} }
runnable := func() { runnable := func() {
@ -145,13 +160,15 @@ func (s *taskService) configureCleanupStaleUpdatesTask() {
scheduledJob := gocron.DurationJob(s.taskConfig.updateCleanStaleInterval) scheduledJob := gocron.DurationJob(s.taskConfig.updateCleanStaleInterval)
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobUpdatesCleanStale)); err != nil { if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobUpdatesCleanStale)); err != nil {
zap.L().Sugar().Fatalf("Could not create task for cleaning stale updates. Reason: %s", err.Error()) return fmt.Errorf("could not create task for cleaning stale updates: %w", err)
}
} }
func (s *taskService) configureCleanupStaleEventsTask() { return nil
}
func (s *taskService) configureCleanupStaleEventsTask() error {
if !s.taskConfig.eventCleanStaleEnabled { if !s.taskConfig.eventCleanStaleEnabled {
return return nil
} }
runnable := func() { runnable := func() {
@ -175,13 +192,15 @@ func (s *taskService) configureCleanupStaleEventsTask() {
scheduledJob := gocron.DurationJob(s.taskConfig.eventCleanStaleInterval) scheduledJob := gocron.DurationJob(s.taskConfig.eventCleanStaleInterval)
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobEventsCleanStale)); err != nil { if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobEventsCleanStale)); err != nil {
zap.L().Sugar().Fatalf("Could not create task for cleaning stale events. Reason: %s", err.Error()) return fmt.Errorf("could not create task for cleaning stale events: %w", err)
}
} }
func (s *taskService) configureActionsEnqueueTask() { return nil
}
func (s *taskService) configureActionsEnqueueTask() error {
if !s.taskConfig.actionsEnqueueEnabled { if !s.taskConfig.actionsEnqueueEnabled {
return return nil
} }
runnable := func() { runnable := func() {
@ -192,13 +211,15 @@ func (s *taskService) configureActionsEnqueueTask() {
scheduledJob := gocron.DurationJob(s.taskConfig.actionsEnqueueInterval) scheduledJob := gocron.DurationJob(s.taskConfig.actionsEnqueueInterval)
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsEnqueue)); err != nil { if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsEnqueue)); err != nil {
zap.L().Sugar().Fatalf("Could not create task for enqueueing actions. Reason: %s", err.Error()) return fmt.Errorf("could not create task for enqueueing actions: %w", err)
}
} }
func (s *taskService) configureActionsInvokeTask() { return nil
}
func (s *taskService) configureActionsInvokeTask() error {
if !s.taskConfig.actionsInvokeEnabled { if !s.taskConfig.actionsInvokeEnabled {
return return nil
} }
runnable := func() { runnable := func() {
@ -209,13 +230,15 @@ func (s *taskService) configureActionsInvokeTask() {
scheduledJob := gocron.DurationJob(s.taskConfig.actionsInvokeInterval) scheduledJob := gocron.DurationJob(s.taskConfig.actionsInvokeInterval)
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsInvoke)); err != nil { if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsInvoke)); err != nil {
zap.L().Sugar().Fatalf("Could not create task for invoking actions. Reason: %s", err.Error()) return fmt.Errorf("could not create task for invoking actions: %w", err)
}
} }
func (s *taskService) configureCleanupStaleActionsTask() { return nil
}
func (s *taskService) configureCleanupStaleActionsTask() error {
if !s.taskConfig.actionsCleanStaleEnabled { if !s.taskConfig.actionsCleanStaleEnabled {
return return nil
} }
runnable := func() { runnable := func() {
@ -246,13 +269,15 @@ func (s *taskService) configureCleanupStaleActionsTask() {
scheduledJob := gocron.DurationJob(s.taskConfig.actionsCleanStaleInterval) scheduledJob := gocron.DurationJob(s.taskConfig.actionsCleanStaleInterval)
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsCleanStale)); err != nil { if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsCleanStale)); err != nil {
zap.L().Sugar().Fatalf("Could not create task for cleaning stale actions. Reason: %s", err.Error()) return fmt.Errorf("could not create task for cleaning stale actions: %w", err)
}
} }
func (s *taskService) configurePrometheusRefreshTask() { return nil
}
func (s *taskService) configurePrometheusRefreshTask() error {
if !s.prometheusConfig.enabled { if !s.prometheusConfig.enabled {
return return nil
} }
runnable := func() { runnable := func() {
@ -310,6 +335,8 @@ func (s *taskService) configurePrometheusRefreshTask() {
scheduledJob := gocron.DurationJob(s.taskConfig.prometheusRefreshInterval) scheduledJob := gocron.DurationJob(s.taskConfig.prometheusRefreshInterval)
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobPrometheusRefresh)); err != nil { if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobPrometheusRefresh)); err != nil {
zap.L().Sugar().Fatalf("Could not create task for refreshing prometheus. Reason: %s", err.Error()) return fmt.Errorf("could not create task for refreshing prometheus: %w", err)
} }
return nil
} }

View file

@ -23,7 +23,9 @@ func CreateFileWithParent(file string) error {
if _, err = os.Stat(file); errors.Is(err, os.ErrNotExist) { if _, err = os.Stat(file); errors.Is(err, os.ErrNotExist) {
var f *os.File var f *os.File
f, err = os.Create(file) f, err = os.Create(file)
defer f.Close() defer func(f *os.File) {
_ = f.Close()
}(f)
if err != nil { if err != nil {
return errors.New(fmt.Sprintf("cannot create file '%v': %v", file, fmt.Errorf("%w", err))) return errors.New(fmt.Sprintf("cannot create file '%v': %v", file, fmt.Errorf("%w", err)))
} }