Compare commits
No commits in common. "e5bdc5add62c810f4f310fc6cacb636d07653178" and "47a48523a91a49751a392e61d39e1b3f067e4e6d" have entirely different histories.
e5bdc5add6
...
47a48523a9
22 changed files with 191 additions and 244 deletions
5
Makefile
5
Makefile
|
@ -55,7 +55,10 @@ checkstyle:
|
||||||
|
|
||||||
checkstyle-ci: checkstyle
|
checkstyle-ci: checkstyle
|
||||||
|
|
||||||
|
checkstyle-ci:
|
||||||
|
go vet ./...
|
||||||
|
|
||||||
test:
|
test:
|
||||||
go test -race ./...
|
go test ./...
|
||||||
|
|
||||||
test-ci: test
|
test-ci: test
|
83
README.md
83
README.md
|
@ -12,7 +12,15 @@ Contributions are very welcome!
|
||||||
|
|
||||||
## Development & contribution
|
## Development & contribution
|
||||||
|
|
||||||
* Pay attention to `make checkstyle` (uses `go vet ./...`); pipeline fails if issues are detected.
|
* Ensure to set the following environment variables for proper debug logs during development
|
||||||
|
|
||||||
|
```shell
|
||||||
|
DEVELOPMENT=true
|
||||||
|
LOGGING_ENCODING=console
|
||||||
|
LOGGING_LEVEL=debug
|
||||||
|
```
|
||||||
|
|
||||||
|
* Code guidelines
|
||||||
* Each entity has its own repository
|
* Each entity has its own repository
|
||||||
* Each entity is only used in repository and service (otherwise, mapping happens)
|
* Each entity is only used in repository and service (otherwise, mapping happens)
|
||||||
* Presenter layer is constructed from the entity, e.g., in REST responses and mapped
|
* Presenter layer is constructed from the entity, e.g., in REST responses and mapped
|
||||||
|
@ -20,30 +28,47 @@ Contributions are very welcome!
|
||||||
* All log calls should be handled by `zap.L()`
|
* All log calls should be handled by `zap.L()`
|
||||||
* Configuration is bootstrapped via separated `struct` types which are given to the service which need them
|
* Configuration is bootstrapped via separated `struct` types which are given to the service which need them
|
||||||
* Error handling
|
* Error handling
|
||||||
* Always throw an error with `NewServiceError` for repositories, services and handlers
|
* Always throw an error with `NewServiceError`
|
||||||
* Always throw an error wrapping the cause with `fmt.Errorf`
|
* Always wrap the cause error with `fmt.Errorf`
|
||||||
* Forward/bubble up the error directly, when original error is already a `NewServiceError` (most likely internal
|
* Forward/bubble up the error directly, when original error is already a `NewServiceError` (most likely internal
|
||||||
calls)
|
calls)
|
||||||
* Always abort handler chain with `AbortWithError`
|
* Always abort handler chain with `AbortWithError`
|
||||||
* Utils can throw any error
|
* Utils can throw any error
|
||||||
* Repositories, handlers and services should always properly return `error` including any `init`-like function (
|
|
||||||
best
|
|
||||||
to avoid them and call in `newXXX`). **Do not abort with `Fatalf` or similar**
|
|
||||||
* `log.Fatalf` or `zap.L().Fatal` is allowed in `environment.go` or `app.go`
|
|
||||||
* Look into the `_doc/` folder for [OpenAPI specification](./_doc/api.yaml) and a Postman Collection.
|
|
||||||
* Consider reading [Effective Go](https://go.dev/doc/effective_go)
|
|
||||||
* Consider reading [100 Go Mistakes and How to Avoid Them](https://100go.co/)
|
|
||||||
|
|
||||||
### Getting started
|
Please look into the `_doc/` folder for [OpenAPI specification](./_doc/api.yaml) and a Postman Collection.
|
||||||
|
|
||||||
Ensure to set the following environment variables for proper debug logs during development
|
**Pay attention to `make checkstyle` (invoked `go vet ./...`) output as pipeline will fail if issues are detected.**
|
||||||
|
|
||||||
|
### Lock service
|
||||||
|
|
||||||
|
The `lockService` can be used to lock resources. This works in-memory and also in a distributed fashion with REDIS.
|
||||||
|
|
||||||
|
Ensure to provide proper locking options when using, although in-memory ignores those.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
DEVELOPMENT=true
|
# invoked from an endpoint
|
||||||
LOGGING_ENCODING=console
|
context := c.Request.Context()
|
||||||
LOGGING_LEVEL=debug
|
|
||||||
|
var err error
|
||||||
|
var lock appLock
|
||||||
|
|
||||||
|
if lock, err = h.lockService.lockWithOptions(context, "TEST-LOCK", withAppLockOptionExpiry(5*time.Minute), withAppLockOptionInfiniteRetries(), withAppLockOptionRetryDelay(5*time.Second)); err != nil {
|
||||||
|
_ = c.AbortWithError(errToHttpStatus(err), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
# defer to avoid leakage
|
||||||
|
defer func(lock appLock) {
|
||||||
|
_ = lock.unlock(context)
|
||||||
|
}(lock)
|
||||||
|
|
||||||
|
# simulate long running task
|
||||||
|
time.Sleep(20 * time.Second)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Getting started
|
||||||
|
|
||||||
1. Run `make clean dependencies` to fetch dependencies
|
1. Run `make clean dependencies` to fetch dependencies
|
||||||
2. Start `git.myservermanager.com/varakh/upda/cmd/server` (or `cli`) as Go application and ensure to have _required_
|
2. Start `git.myservermanager.com/varakh/upda/cmd/server` (or `cli`) as Go application and ensure to have _required_
|
||||||
environment variables set
|
environment variables set
|
||||||
|
@ -75,34 +100,6 @@ path.
|
||||||
For any `go` command you run, ensure that your `PATH` has the `gcc` binary and that you add `CGO_ENABLED=1` as
|
For any `go` command you run, ensure that your `PATH` has the `gcc` binary and that you add `CGO_ENABLED=1` as
|
||||||
environment.
|
environment.
|
||||||
|
|
||||||
### Using the `lockService` correctly
|
|
||||||
|
|
||||||
The `lockService` can be used to lock resources. This works in-memory and also in a distributed fashion with REDIS.
|
|
||||||
|
|
||||||
Ensure to provide proper locking options when using, although in-memory ignores those.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
|
|
||||||
```shell
|
|
||||||
# invoked from an endpoint
|
|
||||||
context := c.Request.Context()
|
|
||||||
|
|
||||||
var err error
|
|
||||||
var lock appLock
|
|
||||||
|
|
||||||
if lock, err = h.lockService.lockWithOptions(context, "TEST-LOCK", withAppLockOptionExpiry(5*time.Minute), withAppLockOptionInfiniteRetries(), withAppLockOptionRetryDelay(5*time.Second)); err != nil {
|
|
||||||
_ = c.AbortWithError(errToHttpStatus(err), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
# defer to avoid leakage
|
|
||||||
defer func(lock appLock) {
|
|
||||||
_ = lock.unlock(context)
|
|
||||||
}(lock)
|
|
||||||
|
|
||||||
# simulate long running task
|
|
||||||
time.Sleep(20 * time.Second)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Release
|
### Release
|
||||||
|
|
||||||
Releases are handled by the SCM platform and pipeline. Creating a **new git tag**, creates a new release in the SCM
|
Releases are handled by the SCM platform and pipeline. Creating a **new git tag**, creates a new release in the SCM
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -20,7 +20,6 @@ require (
|
||||||
github.com/redis/go-redis/v9 v9.5.2
|
github.com/redis/go-redis/v9 v9.5.2
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
github.com/urfave/cli/v2 v2.27.2
|
github.com/urfave/cli/v2 v2.27.2
|
||||||
go.uber.org/automaxprocs v1.5.3
|
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
gorm.io/driver/postgres v1.5.7
|
gorm.io/driver/postgres v1.5.7
|
||||||
gorm.io/driver/sqlite v1.5.5
|
gorm.io/driver/sqlite v1.5.5
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -191,8 +191,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
|
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
|
||||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
||||||
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
|
|
||||||
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
|
|
||||||
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
|
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
|
||||||
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
|
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
|
||||||
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
|
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
|
||||||
|
@ -252,8 +250,6 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
|
||||||
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
|
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
|
||||||
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||||
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
|
|
||||||
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
|
|
||||||
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
|
|
|
@ -29,7 +29,7 @@ func (h *actionHandler) paginate(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var data []*api.ActionResponse
|
var data []*api.ActionResponse
|
||||||
data = make([]*api.ActionResponse, 0, len(actions))
|
data = make([]*api.ActionResponse, 0)
|
||||||
|
|
||||||
for _, e := range actions {
|
for _, e := range actions {
|
||||||
data = append(data, &api.ActionResponse{
|
data = append(data, &api.ActionResponse{
|
||||||
|
|
|
@ -56,7 +56,7 @@ func (h *actionInvocationHandler) paginate(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var data []*api.ActionInvocationResponse
|
var data []*api.ActionInvocationResponse
|
||||||
data = make([]*api.ActionInvocationResponse, 0, len(actionInvocations))
|
data = make([]*api.ActionInvocationResponse, 0)
|
||||||
|
|
||||||
for _, e := range actionInvocations {
|
for _, e := range actionInvocations {
|
||||||
data = append(data, &api.ActionInvocationResponse{
|
data = append(data, &api.ActionInvocationResponse{
|
||||||
|
|
|
@ -29,7 +29,7 @@ func (h *eventHandler) window(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var data []*api.EventResponse
|
var data []*api.EventResponse
|
||||||
data = make([]*api.EventResponse, 0, len(events))
|
data = make([]*api.EventResponse, 0)
|
||||||
|
|
||||||
for _, e := range events {
|
for _, e := range events {
|
||||||
data = append(data, &api.EventResponse{
|
data = append(data, &api.EventResponse{
|
||||||
|
|
|
@ -24,7 +24,7 @@ func (h *secretHandler) getAll(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var data []*api.SecretResponse
|
var data []*api.SecretResponse
|
||||||
data = make([]*api.SecretResponse, 0, len(secrets))
|
data = make([]*api.SecretResponse, 0)
|
||||||
|
|
||||||
for _, e := range secrets {
|
for _, e := range secrets {
|
||||||
data = append(data, &api.SecretResponse{
|
data = append(data, &api.SecretResponse{
|
||||||
|
|
|
@ -28,7 +28,7 @@ func (h *updateHandler) paginate(c *gin.Context) {
|
||||||
|
|
||||||
s, stateQueryContainsAtLeastOne := c.GetQueryArray("state")
|
s, stateQueryContainsAtLeastOne := c.GetQueryArray("state")
|
||||||
|
|
||||||
states := make([]api.UpdateState, 0)
|
var states []api.UpdateState
|
||||||
if stateQueryContainsAtLeastOne {
|
if stateQueryContainsAtLeastOne {
|
||||||
for _, state := range s {
|
for _, state := range s {
|
||||||
states = append(states, api.UpdateState(state))
|
states = append(states, api.UpdateState(state))
|
||||||
|
|
|
@ -29,7 +29,7 @@ func (h *webhookHandler) paginate(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var data []*api.WebhookResponse
|
var data []*api.WebhookResponse
|
||||||
data = make([]*api.WebhookResponse, 0, len(webhooks))
|
data = make([]*api.WebhookResponse, 0)
|
||||||
|
|
||||||
for _, e := range webhooks {
|
for _, e := range webhooks {
|
||||||
data = append(data, &api.WebhookResponse{
|
data = append(data, &api.WebhookResponse{
|
||||||
|
|
160
server/app.go
160
server/app.go
|
@ -8,7 +8,6 @@ import (
|
||||||
"github.com/gin-contrib/cors"
|
"github.com/gin-contrib/cors"
|
||||||
ginzap "github.com/gin-contrib/zap"
|
ginzap "github.com/gin-contrib/zap"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
_ "go.uber.org/automaxprocs"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -31,20 +30,16 @@ func Start() {
|
||||||
gin.SetMode(gin.ReleaseMode)
|
gin.SetMode(gin.ReleaseMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// app init (router, services, handlers)
|
|
||||||
router := gin.New()
|
router := gin.New()
|
||||||
router.Use(ginzap.Ginzap(zap.L(), time.RFC3339, false))
|
router.Use(ginzap.Ginzap(zap.L(), time.RFC3339, false))
|
||||||
router.Use(ginzap.RecoveryWithZap(zap.L(), true))
|
router.Use(ginzap.RecoveryWithZap(zap.L(), true))
|
||||||
|
|
||||||
var err error
|
// metrics
|
||||||
|
prometheusService := newPrometheusService(router, env.prometheusConfig)
|
||||||
ps := newPrometheusService(router, env.prometheusConfig)
|
|
||||||
|
|
||||||
if env.prometheusConfig.enabled {
|
if env.prometheusConfig.enabled {
|
||||||
if err = ps.init(); err != nil {
|
prometheusService.init()
|
||||||
zap.L().Sugar().Fatalf("Prometheus service init failed: %s", err.Error())
|
router.Use(prometheusService.prometheus.Instrument())
|
||||||
}
|
|
||||||
router.Use(ps.prometheus.Instrument())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
updateRepo := newUpdateDbRepo(env.db)
|
updateRepo := newUpdateDbRepo(env.db)
|
||||||
|
@ -54,51 +49,43 @@ func Start() {
|
||||||
actionRepo := newActionDbRepo(env.db)
|
actionRepo := newActionDbRepo(env.db)
|
||||||
actionInvocationRepo := newActionInvocationDbRepo(env.db)
|
actionInvocationRepo := newActionInvocationDbRepo(env.db)
|
||||||
|
|
||||||
var ls lockService
|
var lockService lockService
|
||||||
|
|
||||||
if env.lockConfig.redisEnabled {
|
if env.lockConfig.redisEnabled {
|
||||||
var e error
|
var err error
|
||||||
ls, e = newLockRedisService(env.lockConfig)
|
lockService, err = newLockRedisService(env.lockConfig)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Fatal("Failed to create lock service", zap.Error(e))
|
zap.L().Fatal("Failed to create lock service", zap.Error(err))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ls = newLockMemService()
|
lockService = newLockMemService()
|
||||||
}
|
}
|
||||||
|
|
||||||
es := newEventService(eventRepo)
|
eventService := newEventService(eventRepo)
|
||||||
us := newUpdateService(updateRepo, es)
|
updateService := newUpdateService(updateRepo, eventService)
|
||||||
ws := newWebhookService(webhookRepo, env.webhookConfig)
|
webhookService := newWebhookService(webhookRepo, env.webhookConfig)
|
||||||
wis := newWebhookInvocationService(ws, us, env.webhookConfig)
|
webhookInvocationService := newWebhookInvocationService(webhookService, updateService, env.webhookConfig)
|
||||||
|
|
||||||
ss := newSecretService(secretRepo)
|
secretService := newSecretService(secretRepo)
|
||||||
as := newActionService(actionRepo, es)
|
actionService := newActionService(actionRepo, eventService)
|
||||||
ais := newActionInvocationService(actionInvocationRepo, as, es, ss)
|
actionInvocationService := newActionInvocationService(actionInvocationRepo, actionService, eventService, secretService)
|
||||||
|
|
||||||
var ts *taskService
|
taskService := newTaskService(updateService, eventService, webhookService, actionService, actionInvocationService, lockService, prometheusService, env.appConfig, env.taskConfig, env.lockConfig, env.prometheusConfig)
|
||||||
|
taskService.init()
|
||||||
|
taskService.start()
|
||||||
|
|
||||||
if ts, err = newTaskService(us, es, ws, as, ais, ls, ps, env.appConfig, env.taskConfig, env.lockConfig, env.prometheusConfig); err != nil {
|
updateHandler := newUpdateHandler(updateService, env.appConfig)
|
||||||
zap.L().Sugar().Fatalf("Task service creation failed: %v", err)
|
webhookHandler := newWebhookHandler(webhookService)
|
||||||
}
|
webhookInvocationHandler := newWebhookInvocationHandler(webhookInvocationService, webhookService)
|
||||||
|
eventHandler := newEventHandler(eventService)
|
||||||
|
secretHandler := newSecretHandler(secretService)
|
||||||
|
actionHandler := newActionHandler(actionService)
|
||||||
|
actionInvocationHandler := newActionInvocationHandler(actionService, actionInvocationService)
|
||||||
|
|
||||||
if err = ts.init(); err != nil {
|
infoHandler := newInfoHandler(env.appConfig)
|
||||||
zap.L().Sugar().Fatalf("Task service initialization failed: %v", err)
|
healthHandler := newHealthHandler()
|
||||||
}
|
authHandler := newAuthHandler()
|
||||||
|
|
||||||
ts.start()
|
|
||||||
|
|
||||||
uh := newUpdateHandler(us, env.appConfig)
|
|
||||||
wh := newWebhookHandler(ws)
|
|
||||||
wih := newWebhookInvocationHandler(wis, ws)
|
|
||||||
eh := newEventHandler(es)
|
|
||||||
sh := newSecretHandler(ss)
|
|
||||||
ah := newActionHandler(as)
|
|
||||||
aih := newActionInvocationHandler(as, ais)
|
|
||||||
|
|
||||||
ih := newInfoHandler(env.appConfig)
|
|
||||||
hh := newHealthHandler()
|
|
||||||
authH := newAuthHandler()
|
|
||||||
|
|
||||||
router.Use(middlewareAppName())
|
router.Use(middlewareAppName())
|
||||||
router.Use(middlewareAppVersion())
|
router.Use(middlewareAppVersion())
|
||||||
|
@ -114,10 +101,10 @@ func Start() {
|
||||||
}))
|
}))
|
||||||
|
|
||||||
apiPublicGroup := router.Group("/api/v1")
|
apiPublicGroup := router.Group("/api/v1")
|
||||||
apiPublicGroup.GET("/health", hh.show)
|
apiPublicGroup.GET("/health", healthHandler.show)
|
||||||
apiPublicGroup.GET("/info", ih.show)
|
apiPublicGroup.GET("/info", infoHandler.show)
|
||||||
|
|
||||||
apiPublicGroup.POST("/webhooks/:id", wih.execute)
|
apiPublicGroup.POST("/webhooks/:id", webhookInvocationHandler.execute)
|
||||||
|
|
||||||
var authMethodHandler gin.HandlerFunc
|
var authMethodHandler gin.HandlerFunc
|
||||||
|
|
||||||
|
@ -133,47 +120,48 @@ func Start() {
|
||||||
|
|
||||||
apiAuthGroup := router.Group("/api/v1", authMethodHandler)
|
apiAuthGroup := router.Group("/api/v1", authMethodHandler)
|
||||||
|
|
||||||
apiAuthGroup.GET("/login", authH.login)
|
apiAuthGroup.GET("/login", authHandler.login)
|
||||||
|
|
||||||
apiAuthGroup.GET("/updates", uh.paginate)
|
apiAuthGroup.GET("/updates", updateHandler.paginate)
|
||||||
apiAuthGroup.GET("/updates/:id", uh.get)
|
apiAuthGroup.GET("/updates/:id", updateHandler.get)
|
||||||
apiAuthGroup.PATCH("/updates/:id/state", uh.updateState)
|
apiAuthGroup.PATCH("/updates/:id/state", updateHandler.updateState)
|
||||||
apiAuthGroup.DELETE("/updates/:id", uh.delete)
|
apiAuthGroup.DELETE("/updates/:id", updateHandler.delete)
|
||||||
|
|
||||||
apiAuthGroup.GET("/webhooks", wh.paginate)
|
apiAuthGroup.GET("/webhooks", webhookHandler.paginate)
|
||||||
apiAuthGroup.POST("/webhooks", wh.create)
|
apiAuthGroup.POST("/webhooks", webhookHandler.create)
|
||||||
apiAuthGroup.GET("/webhooks/:id", wh.get)
|
apiAuthGroup.GET("/webhooks/:id", webhookHandler.get)
|
||||||
apiAuthGroup.PATCH("/webhooks/:id/label", wh.updateLabel)
|
apiAuthGroup.PATCH("/webhooks/:id/label", webhookHandler.updateLabel)
|
||||||
apiAuthGroup.PATCH("/webhooks/:id/ignore-host", wh.updateIgnoreHost)
|
apiAuthGroup.PATCH("/webhooks/:id/ignore-host", webhookHandler.updateIgnoreHost)
|
||||||
apiAuthGroup.DELETE("/webhooks/:id", wh.delete)
|
apiAuthGroup.DELETE("/webhooks/:id", webhookHandler.delete)
|
||||||
|
|
||||||
apiAuthGroup.GET("/events", eh.window)
|
apiAuthGroup.GET("/events", eventHandler.window)
|
||||||
apiAuthGroup.GET("/events/:id", eh.get)
|
apiAuthGroup.GET("/events/:id", eventHandler.get)
|
||||||
apiAuthGroup.DELETE("/events/:id", eh.delete)
|
apiAuthGroup.DELETE("/events/:id", eventHandler.delete)
|
||||||
|
|
||||||
apiAuthGroup.GET("/secrets", sh.getAll)
|
apiAuthGroup.GET("/secrets", secretHandler.getAll)
|
||||||
apiAuthGroup.GET("/secrets/:id", sh.get)
|
apiAuthGroup.GET("/secrets/:id", secretHandler.get)
|
||||||
apiAuthGroup.POST("/secrets", sh.create)
|
apiAuthGroup.POST("/secrets", secretHandler.create)
|
||||||
apiAuthGroup.PATCH("/secrets/:id/value", sh.updateValue)
|
apiAuthGroup.PATCH("/secrets/:id/value", secretHandler.updateValue)
|
||||||
apiAuthGroup.DELETE("/secrets/:id", sh.delete)
|
apiAuthGroup.DELETE("/secrets/:id", secretHandler.delete)
|
||||||
|
|
||||||
apiAuthGroup.GET("/actions", ah.paginate)
|
apiAuthGroup.GET("/actions", actionHandler.paginate)
|
||||||
apiAuthGroup.POST("/actions", ah.create)
|
apiAuthGroup.POST("/actions", actionHandler.create)
|
||||||
apiAuthGroup.GET("/actions/:id", ah.get)
|
apiAuthGroup.GET("/actions/:id", actionHandler.get)
|
||||||
apiAuthGroup.PATCH("/actions/:id/label", ah.updateLabel)
|
apiAuthGroup.PATCH("/actions/:id/label", actionHandler.updateLabel)
|
||||||
apiAuthGroup.PATCH("/actions/:id/match-event", ah.updateMatchEvent)
|
apiAuthGroup.PATCH("/actions/:id/match-event", actionHandler.updateMatchEvent)
|
||||||
apiAuthGroup.PATCH("/actions/:id/match-host", ah.updateMatchHost)
|
apiAuthGroup.PATCH("/actions/:id/match-host", actionHandler.updateMatchHost)
|
||||||
apiAuthGroup.PATCH("/actions/:id/match-application", ah.updateMatchApplication)
|
apiAuthGroup.PATCH("/actions/:id/match-application", actionHandler.updateMatchApplication)
|
||||||
apiAuthGroup.PATCH("/actions/:id/match-provider", ah.updateMatchProvider)
|
apiAuthGroup.PATCH("/actions/:id/match-provider", actionHandler.updateMatchProvider)
|
||||||
apiAuthGroup.PATCH("/actions/:id/payload", ah.updatePayload)
|
apiAuthGroup.PATCH("/actions/:id/payload", actionHandler.updatePayload)
|
||||||
apiAuthGroup.PATCH("/actions/:id/enabled", ah.updateEnabled)
|
apiAuthGroup.PATCH("/actions/:id/enabled", actionHandler.updateEnabled)
|
||||||
apiAuthGroup.DELETE("/actions/:id", ah.delete)
|
apiAuthGroup.DELETE("/actions/:id", actionHandler.delete)
|
||||||
apiAuthGroup.POST("/actions/:id/test", aih.test)
|
apiAuthGroup.POST("/actions/:id/test", actionInvocationHandler.test)
|
||||||
|
|
||||||
apiAuthGroup.GET("/action-invocations", aih.paginate)
|
apiAuthGroup.GET("/action-invocations", actionInvocationHandler.paginate)
|
||||||
apiAuthGroup.GET("/action-invocations/:id", aih.get)
|
apiAuthGroup.GET("/action-invocations/:id", actionInvocationHandler.get)
|
||||||
apiAuthGroup.DELETE("/action-invocations/:id", aih.delete)
|
apiAuthGroup.DELETE("/action-invocations/:id", actionInvocationHandler.delete)
|
||||||
|
|
||||||
|
// start server
|
||||||
serverAddress := fmt.Sprintf("%s:%d", env.serverConfig.listen, env.serverConfig.port)
|
serverAddress := fmt.Sprintf("%s:%d", env.serverConfig.listen, env.serverConfig.port)
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
Addr: serverAddress,
|
Addr: serverAddress,
|
||||||
|
@ -181,16 +169,16 @@ func Start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var e error
|
var err error
|
||||||
|
|
||||||
if env.serverConfig.tlsEnabled {
|
if env.serverConfig.tlsEnabled {
|
||||||
e = srv.ListenAndServeTLS(env.serverConfig.tlsCertPath, env.serverConfig.tlsKeyPath)
|
err = srv.ListenAndServeTLS(env.serverConfig.tlsCertPath, env.serverConfig.tlsKeyPath)
|
||||||
} else {
|
} else {
|
||||||
e = srv.ListenAndServe()
|
err = srv.ListenAndServe()
|
||||||
}
|
}
|
||||||
|
|
||||||
if e != nil && !errors.Is(e, http.ErrServerClosed) {
|
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
zap.L().Sugar().Fatalf("Application cannot be started: %v", e)
|
zap.L().Sugar().Fatalf("Application cannot be started: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -204,11 +192,11 @@ func Start() {
|
||||||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||||
<-quit
|
<-quit
|
||||||
zap.L().Info("Shutting down...")
|
zap.L().Info("Shutting down...")
|
||||||
ts.stop()
|
taskService.stop()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), env.serverConfig.timeout)
|
ctx, cancel := context.WithTimeout(context.Background(), env.serverConfig.timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err = srv.Shutdown(ctx); err != nil {
|
if err := srv.Shutdown(ctx); err != nil {
|
||||||
zap.L().Sugar().Fatalf("Shutdown failed, exited directly: %v", err)
|
zap.L().Sugar().Fatalf("Shutdown failed, exited directly: %v", err)
|
||||||
}
|
}
|
||||||
// catching ctx.Done() for configured timeout
|
// catching ctx.Done() for configured timeout
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
"gorm.io/gorm/clause"
|
"gorm.io/gorm/clause"
|
||||||
)
|
)
|
||||||
|
|
||||||
// JSONMap defined JSON data type, need to implement driver.Valuer, sql.Scanner interface
|
// JSONMap defined JSON data type, need to implements driver.Valuer, sql.Scanner interface
|
||||||
type JSONMap map[string]interface {
|
type JSONMap map[string]interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,7 +27,7 @@ func (m JSONMap) Value() (driver.Value, error) {
|
||||||
return string(ba), err
|
return string(ba), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scan value into JSONB, implements sql.Scanner interface
|
// Scan scan value into Jsonb, implements sql.Scanner interface
|
||||||
func (m *JSONMap) Scan(val interface{}) error {
|
func (m *JSONMap) Scan(val interface{}) error {
|
||||||
if val == nil {
|
if val == nil {
|
||||||
*m = make(JSONMap)
|
*m = make(JSONMap)
|
||||||
|
|
|
@ -165,9 +165,7 @@ func bootstrapEnvironment() *Environment {
|
||||||
}
|
}
|
||||||
|
|
||||||
zapLogger := zap.Must(zapConfig.Build())
|
zapLogger := zap.Must(zapConfig.Build())
|
||||||
defer func(zapLogger *zap.Logger) {
|
defer zapLogger.Sync()
|
||||||
_ = zapLogger.Sync()
|
|
||||||
}(zapLogger)
|
|
||||||
zap.ReplaceGlobals(zapLogger)
|
zap.ReplaceGlobals(zapLogger)
|
||||||
|
|
||||||
// assign defaults from given environment variables and validate
|
// assign defaults from given environment variables and validate
|
||||||
|
@ -325,9 +323,7 @@ func bootstrapEnvironment() *Environment {
|
||||||
gormConfig := &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)}
|
gormConfig := &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)}
|
||||||
if isDebug && isDevelopment {
|
if isDebug && isDevelopment {
|
||||||
gormZapLogger := zap.Must(zapConfig.Build())
|
gormZapLogger := zap.Must(zapConfig.Build())
|
||||||
defer func(gormZapLogger *zap.Logger) {
|
defer gormZapLogger.Sync()
|
||||||
_ = gormZapLogger.Sync()
|
|
||||||
}(gormZapLogger)
|
|
||||||
gormLogger := zapgorm2.New(gormZapLogger)
|
gormLogger := zapgorm2.New(gormZapLogger)
|
||||||
gormConfig = &gorm.Config{Logger: gormLogger}
|
gormConfig = &gorm.Config{Logger: gormLogger}
|
||||||
}
|
}
|
||||||
|
|
|
@ -225,8 +225,8 @@ func (r *actionInvocationDbRepo) deleteByUpdatedAtBeforeAndStates(time time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func translateActionInvocationState(state ...api.ActionInvocationState) []string {
|
func translateActionInvocationState(state ...api.ActionInvocationState) []string {
|
||||||
states := make([]string, 0, len(state))
|
states := make([]string, 0)
|
||||||
if len(states) > 0 {
|
if len(state) > 0 {
|
||||||
for _, s := range state {
|
for _, s := range state {
|
||||||
states = append(states, s.Value())
|
states = append(states, s.Value())
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,7 +125,7 @@ func (r *eventDbRepo) deleteByUpdatedAtBeforeAndStates(time time.Time, state ...
|
||||||
return 0, errorValidationNotEmpty
|
return 0, errorValidationNotEmpty
|
||||||
}
|
}
|
||||||
|
|
||||||
states := make([]string, 0, len(state))
|
states := make([]string, 0)
|
||||||
for _, i := range state {
|
for _, i := range state {
|
||||||
states = append(states, i.Value())
|
states = append(states, i.Value())
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,7 +199,7 @@ func (r *updateDbRepo) deleteByUpdatedAtBeforeAndStates(time time.Time, state ..
|
||||||
return 0, errorValidationNotEmpty
|
return 0, errorValidationNotEmpty
|
||||||
}
|
}
|
||||||
|
|
||||||
states := make([]string, 0, len(state))
|
states := make([]string, 0)
|
||||||
for _, i := range state {
|
for _, i := range state {
|
||||||
states = append(states, i.Value())
|
states = append(states, i.Value())
|
||||||
}
|
}
|
||||||
|
@ -231,8 +231,8 @@ func (r *updateDbRepo) paginate(page int, pageSize int, orderBy string, order st
|
||||||
order = "desc"
|
order = "desc"
|
||||||
}
|
}
|
||||||
|
|
||||||
states := make([]string, 0, len(state))
|
states := make([]string, 0)
|
||||||
if len(states) > 0 {
|
if len(state) > 0 {
|
||||||
for _, s := range state {
|
for _, s := range state {
|
||||||
states = append(states, s.Value())
|
states = append(states, s.Value())
|
||||||
}
|
}
|
||||||
|
@ -248,8 +248,8 @@ func (r *updateDbRepo) paginate(page int, pageSize int, orderBy string, order st
|
||||||
func (r *updateDbRepo) count(searchTerm string, searchIn string, state ...api.UpdateState) (int64, error) {
|
func (r *updateDbRepo) count(searchTerm string, searchIn string, state ...api.UpdateState) (int64, error) {
|
||||||
var c int64
|
var c int64
|
||||||
|
|
||||||
states := make([]string, 0, len(state))
|
states := make([]string, 0)
|
||||||
if len(states) > 0 {
|
if len(state) > 0 {
|
||||||
for _, s := range state {
|
for _, s := range state {
|
||||||
states = append(states, s.Value())
|
states = append(states, s.Value())
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,8 +65,7 @@ func (s *actionInvocationService) enqueueFromEvent(event *Event, actions []*Acti
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
filteredActions := make([]*Action, 0)
|
var filteredActions []*Action
|
||||||
|
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
matchesEvent := action.MatchEvent == nil || *action.MatchEvent == event.Name
|
matchesEvent := action.MatchEvent == nil || *action.MatchEvent == event.Name
|
||||||
matchesHost := action.MatchHost == nil || *action.MatchHost == eventPayload.Host
|
matchesHost := action.MatchHost == nil || *action.MatchHost == eventPayload.Host
|
||||||
|
|
|
@ -17,7 +17,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func newLockMemService() lockService {
|
func newLockMemService() lockService {
|
||||||
zap.L().Info("Initializing in-memory locking service")
|
zap.L().Info("Initialized in-memory locking service")
|
||||||
return &lockMemService{registry: util.NewInMemoryLockRegistry()}
|
return &lockMemService{registry: util.NewInMemoryLockRegistry()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,19 +20,19 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func newLockRedisService(lc *lockConfig) (lockService, error) {
|
func newLockRedisService(lc *lockConfig) (lockService, error) {
|
||||||
zap.L().Info("Initializing REDIS locking service")
|
zap.L().Info("Initialized REDIS locking service")
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var redisOptions *redis.Options
|
var redisOptions *redis.Options
|
||||||
redisOptions, err = redis.ParseURL(lc.redisUrl)
|
redisOptions, err = redis.ParseURL(lc.redisUrl)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("lock service: cannot parse REDIS URL '%s' to set up locking: %s", lc.redisUrl, err)
|
zap.L().Sugar().Fatalf("Cannot parse REDIS URL '%s' to set up locking. Reason: %s", lc.redisUrl, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
c := redis.NewClient(redisOptions)
|
c := redis.NewClient(redisOptions)
|
||||||
if err = c.Ping(context.Background()).Err(); err != nil {
|
if err = c.Ping(context.Background()).Err(); err != nil {
|
||||||
return nil, fmt.Errorf("lock service: failed to connect to REDIS: %w", err)
|
return nil, newServiceError(General, fmt.Errorf("lock service: failed to connect to REDIS. Reason: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
pool := redsyncgoredis.NewPool(c)
|
pool := redsyncgoredis.NewPool(c)
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"github.com/Depado/ginprom"
|
"github.com/Depado/ginprom"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type prometheusService struct {
|
type prometheusService struct {
|
||||||
|
@ -42,9 +42,9 @@ func newPrometheusService(r *gin.Engine, c *prometheusConfig) *prometheusService
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *prometheusService) init() error {
|
func (s *prometheusService) init() {
|
||||||
if !s.config.enabled {
|
if !s.config.enabled {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
@ -59,10 +59,8 @@ func (s *prometheusService) init() error {
|
||||||
err = s.registerGaugeNoLabels(metricActions, metricActionsHelp)
|
err = s.registerGaugeNoLabels(metricActions, metricActionsHelp)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return newServiceError(General, fmt.Errorf("cannot initialize service: %w", err))
|
zap.L().Sugar().Fatalf("Cannot initialize service. Reason: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *prometheusService) registerGaugeNoLabels(name string, help string) error {
|
func (s *prometheusService) registerGaugeNoLabels(name string, help string) error {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"git.myservermanager.com/varakh/upda/api"
|
"git.myservermanager.com/varakh/upda/api"
|
||||||
redislock "github.com/go-co-op/gocron-redis-lock/v2"
|
redislock "github.com/go-co-op/gocron-redis-lock/v2"
|
||||||
"github.com/go-co-op/gocron/v2"
|
"github.com/go-co-op/gocron/v2"
|
||||||
|
@ -39,11 +38,11 @@ var (
|
||||||
initialTasksStartDelay = time.Now().Add(10 * time.Second)
|
initialTasksStartDelay = time.Now().Add(10 * time.Second)
|
||||||
)
|
)
|
||||||
|
|
||||||
func newTaskService(u *updateService, e *eventService, w *webhookService, a *actionService, ai *actionInvocationService, l lockService, p *prometheusService, ac *appConfig, tc *taskConfig, lc *lockConfig, pc *prometheusConfig) (*taskService, error) {
|
func newTaskService(u *updateService, e *eventService, w *webhookService, a *actionService, ai *actionInvocationService, l lockService, p *prometheusService, ac *appConfig, tc *taskConfig, lc *lockConfig, pc *prometheusConfig) *taskService {
|
||||||
var err error
|
location, err := time.LoadLocation(ac.timeZone)
|
||||||
var location *time.Location
|
|
||||||
if location, err = time.LoadLocation(ac.timeZone); err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not initialize correct timezone for scheduler: %s", err)
|
zap.L().Sugar().Fatalf("Could not initialize correct timezone for scheduler. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// global job options
|
// global job options
|
||||||
|
@ -65,13 +64,13 @@ func newTaskService(u *updateService, e *eventService, w *webhookService, a *act
|
||||||
redisOptions, err = redis.ParseURL(lc.redisUrl)
|
redisOptions, err = redis.ParseURL(lc.redisUrl)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot parse REDIS URL '%s' to set up locking for scheduler: %s", lc.redisUrl, err)
|
zap.L().Sugar().Fatalf("Cannot parse REDIS URL '%s' to set up locking. Reason: %s", lc.redisUrl, err.Error())
|
||||||
}
|
}
|
||||||
redisClient := redis.NewClient(redisOptions)
|
redisClient := redis.NewClient(redisOptions)
|
||||||
|
|
||||||
var locker gocron.Locker
|
var locker gocron.Locker
|
||||||
if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1), redislock.WithExpiry(30*time.Second), redislock.WithRetryDelay(5*time.Second)); err != nil {
|
if locker, err = redislock.NewRedisLocker(redisClient, redislock.WithTries(1), redislock.WithExpiry(30*time.Second), redislock.WithRetryDelay(5*time.Second)); err != nil {
|
||||||
return nil, fmt.Errorf("cannot set up REDIS locker for scheduler: %s", err)
|
zap.L().Sugar().Fatalf("Cannot set up REDIS locker. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
schedulerOptions = append(schedulerOptions, gocron.WithDistributedLocker(locker))
|
schedulerOptions = append(schedulerOptions, gocron.WithDistributedLocker(locker))
|
||||||
|
@ -92,30 +91,16 @@ func newTaskService(u *updateService, e *eventService, w *webhookService, a *act
|
||||||
lockConfig: lc,
|
lockConfig: lc,
|
||||||
prometheusConfig: pc,
|
prometheusConfig: pc,
|
||||||
scheduler: scheduler,
|
scheduler: scheduler,
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskService) init() error {
|
func (s *taskService) init() {
|
||||||
if err := s.configureCleanupStaleUpdatesTask(); err != nil {
|
s.configureCleanupStaleUpdatesTask()
|
||||||
return err
|
s.configureCleanupStaleEventsTask()
|
||||||
}
|
s.configureActionsEnqueueTask()
|
||||||
if err := s.configureCleanupStaleEventsTask(); err != nil {
|
s.configureActionsInvokeTask()
|
||||||
return err
|
s.configureCleanupStaleActionsTask()
|
||||||
}
|
s.configurePrometheusRefreshTask()
|
||||||
if err := s.configureActionsEnqueueTask(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := s.configureActionsInvokeTask(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := s.configureCleanupStaleActionsTask(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := s.configurePrometheusRefreshTask(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskService) stop() {
|
func (s *taskService) stop() {
|
||||||
|
@ -134,9 +119,9 @@ func (s *taskService) start() {
|
||||||
zap.L().Sugar().Infof("Started %d periodic tasks", len(s.scheduler.Jobs()))
|
zap.L().Sugar().Infof("Started %d periodic tasks", len(s.scheduler.Jobs()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskService) configureCleanupStaleUpdatesTask() error {
|
func (s *taskService) configureCleanupStaleUpdatesTask() {
|
||||||
if !s.taskConfig.updateCleanStaleEnabled {
|
if !s.taskConfig.updateCleanStaleEnabled {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
runnable := func() {
|
runnable := func() {
|
||||||
|
@ -160,15 +145,13 @@ func (s *taskService) configureCleanupStaleUpdatesTask() error {
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.updateCleanStaleInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.updateCleanStaleInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobUpdatesCleanStale)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobUpdatesCleanStale)); err != nil {
|
||||||
return fmt.Errorf("could not create task for cleaning stale updates: %w", err)
|
zap.L().Sugar().Fatalf("Could not create task for cleaning stale updates. Reason: %s", err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
func (s *taskService) configureCleanupStaleEventsTask() {
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskService) configureCleanupStaleEventsTask() error {
|
|
||||||
if !s.taskConfig.eventCleanStaleEnabled {
|
if !s.taskConfig.eventCleanStaleEnabled {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
runnable := func() {
|
runnable := func() {
|
||||||
|
@ -192,15 +175,13 @@ func (s *taskService) configureCleanupStaleEventsTask() error {
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.eventCleanStaleInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.eventCleanStaleInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobEventsCleanStale)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobEventsCleanStale)); err != nil {
|
||||||
return fmt.Errorf("could not create task for cleaning stale events: %w", err)
|
zap.L().Sugar().Fatalf("Could not create task for cleaning stale events. Reason: %s", err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
func (s *taskService) configureActionsEnqueueTask() {
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskService) configureActionsEnqueueTask() error {
|
|
||||||
if !s.taskConfig.actionsEnqueueEnabled {
|
if !s.taskConfig.actionsEnqueueEnabled {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
runnable := func() {
|
runnable := func() {
|
||||||
|
@ -211,15 +192,13 @@ func (s *taskService) configureActionsEnqueueTask() error {
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.actionsEnqueueInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.actionsEnqueueInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsEnqueue)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsEnqueue)); err != nil {
|
||||||
return fmt.Errorf("could not create task for enqueueing actions: %w", err)
|
zap.L().Sugar().Fatalf("Could not create task for enqueueing actions. Reason: %s", err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
func (s *taskService) configureActionsInvokeTask() {
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskService) configureActionsInvokeTask() error {
|
|
||||||
if !s.taskConfig.actionsInvokeEnabled {
|
if !s.taskConfig.actionsInvokeEnabled {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
runnable := func() {
|
runnable := func() {
|
||||||
|
@ -230,15 +209,13 @@ func (s *taskService) configureActionsInvokeTask() error {
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.actionsInvokeInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.actionsInvokeInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsInvoke)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsInvoke)); err != nil {
|
||||||
return fmt.Errorf("could not create task for invoking actions: %w", err)
|
zap.L().Sugar().Fatalf("Could not create task for invoking actions. Reason: %s", err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
func (s *taskService) configureCleanupStaleActionsTask() {
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskService) configureCleanupStaleActionsTask() error {
|
|
||||||
if !s.taskConfig.actionsCleanStaleEnabled {
|
if !s.taskConfig.actionsCleanStaleEnabled {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
runnable := func() {
|
runnable := func() {
|
||||||
|
@ -269,15 +246,13 @@ func (s *taskService) configureCleanupStaleActionsTask() error {
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.actionsCleanStaleInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.actionsCleanStaleInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsCleanStale)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobActionsCleanStale)); err != nil {
|
||||||
return fmt.Errorf("could not create task for cleaning stale actions: %w", err)
|
zap.L().Sugar().Fatalf("Could not create task for cleaning stale actions. Reason: %s", err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
func (s *taskService) configurePrometheusRefreshTask() {
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskService) configurePrometheusRefreshTask() error {
|
|
||||||
if !s.prometheusConfig.enabled {
|
if !s.prometheusConfig.enabled {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
runnable := func() {
|
runnable := func() {
|
||||||
|
@ -335,8 +310,6 @@ func (s *taskService) configurePrometheusRefreshTask() error {
|
||||||
|
|
||||||
scheduledJob := gocron.DurationJob(s.taskConfig.prometheusRefreshInterval)
|
scheduledJob := gocron.DurationJob(s.taskConfig.prometheusRefreshInterval)
|
||||||
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobPrometheusRefresh)); err != nil {
|
if _, err := s.scheduler.NewJob(scheduledJob, gocron.NewTask(runnable), gocron.WithName(jobPrometheusRefresh)); err != nil {
|
||||||
return fmt.Errorf("could not create task for refreshing prometheus: %w", err)
|
zap.L().Sugar().Fatalf("Could not create task for refreshing prometheus. Reason: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,7 @@ func CreateFileWithParent(file string) error {
|
||||||
if _, err = os.Stat(file); errors.Is(err, os.ErrNotExist) {
|
if _, err = os.Stat(file); errors.Is(err, os.ErrNotExist) {
|
||||||
var f *os.File
|
var f *os.File
|
||||||
f, err = os.Create(file)
|
f, err = os.Create(file)
|
||||||
defer func(f *os.File) {
|
defer f.Close()
|
||||||
_ = f.Close()
|
|
||||||
}(f)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.New(fmt.Sprintf("cannot create file '%v': %v", file, fmt.Errorf("%w", err)))
|
return errors.New(fmt.Sprintf("cannot create file '%v': %v", file, fmt.Errorf("%w", err)))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue