feat(actions): Add basic functionality for actions and secrets (with proper asynchronous enqueue and dequeue mechanism)
Some checks failed
/ build (pull_request) Failing after 3m25s
Some checks failed
/ build (pull_request) Failing after 3m25s
This commit is contained in:
parent
82fa877b6f
commit
9d64e66125
10 changed files with 113 additions and 32 deletions
|
@ -10,11 +10,9 @@ Changes adhere to [semantic versioning](https://semver.org).
|
|||
* Switched to encrypting webhook tokens in database
|
||||
* Added _Actions_, a simple way to trigger notifications via [shoutrrr](https://containrrr.dev/shoutrrr) which supports secrets
|
||||
* Switched to producing events only for _Updates_
|
||||
|
||||
## [1.1.0] - UNRELEASED
|
||||
|
||||
* Adapted logging which defaults to JSON encoding
|
||||
* ...
|
||||
* Updated dependencies
|
||||
* Updated build to use Go 1.22
|
||||
|
||||
## [1.0.3] - 2024/01/21
|
||||
|
||||
|
|
|
@ -412,7 +412,7 @@ paths:
|
|||
required: false
|
||||
schema:
|
||||
type: string
|
||||
default: desc
|
||||
default: asc
|
||||
enum:
|
||||
- asc
|
||||
- desc
|
||||
|
@ -422,7 +422,7 @@ paths:
|
|||
required: false
|
||||
schema:
|
||||
type: string
|
||||
default: updated_at
|
||||
default: label
|
||||
enum:
|
||||
- id
|
||||
- label
|
||||
|
@ -1008,7 +1008,7 @@ paths:
|
|||
required: false
|
||||
schema:
|
||||
type: string
|
||||
default: desc
|
||||
default: asc
|
||||
enum:
|
||||
- asc
|
||||
- desc
|
||||
|
@ -1018,7 +1018,7 @@ paths:
|
|||
required: false
|
||||
schema:
|
||||
type: string
|
||||
default: updated_at
|
||||
default: label
|
||||
enum:
|
||||
- id
|
||||
- label
|
||||
|
@ -1675,7 +1675,7 @@ paths:
|
|||
required: false
|
||||
schema:
|
||||
type: string
|
||||
default: updated_at
|
||||
default: created_at
|
||||
enum:
|
||||
- id
|
||||
- state
|
||||
|
@ -2497,8 +2497,12 @@ components:
|
|||
enum:
|
||||
- created
|
||||
- running
|
||||
- retrying
|
||||
- error
|
||||
- success
|
||||
message:
|
||||
type: string
|
||||
nullable: true
|
||||
actionId:
|
||||
type: string
|
||||
eventId:
|
||||
|
|
|
@ -93,10 +93,11 @@ func (e ActionType) Value() string {
|
|||
type ActionInvocationState string
|
||||
|
||||
const (
|
||||
ActionInvocationStateCreated ActionInvocationState = "created"
|
||||
ActionInvocationStateRunning ActionInvocationState = "running"
|
||||
ActionInvocationStateSuccess ActionInvocationState = "success"
|
||||
ActionInvocationStateError ActionInvocationState = "error"
|
||||
ActionInvocationStateCreated ActionInvocationState = "created"
|
||||
ActionInvocationStateRunning ActionInvocationState = "running"
|
||||
ActionInvocationStateRetrying ActionInvocationState = "retrying"
|
||||
ActionInvocationStateSuccess ActionInvocationState = "success"
|
||||
ActionInvocationStateError ActionInvocationState = "error"
|
||||
)
|
||||
|
||||
func (e *ActionInvocationState) Scan(value interface{}) error {
|
||||
|
|
22
api/dto.go
22
api/dto.go
|
@ -91,22 +91,22 @@ type PaginateUpdateRequest struct {
|
|||
type PaginateWebhookRequest struct {
|
||||
PageSize int `form:"pageSize,default=5" binding:"numeric,gte=1"`
|
||||
Page int `form:"page,default=1" binding:"numeric,gte=1"`
|
||||
Order string `form:"order,default=desc" binding:"oneof=asc desc"`
|
||||
OrderBy string `form:"orderBy,default=updated_at" binding:"oneof=id label type created_at updated_at"`
|
||||
Order string `form:"order,default=asc" binding:"oneof=asc desc"`
|
||||
OrderBy string `form:"orderBy,default=label" binding:"oneof=id label type created_at updated_at"`
|
||||
}
|
||||
|
||||
type PaginateActionRequest struct {
|
||||
PageSize int `form:"pageSize,default=5" binding:"numeric,gte=1"`
|
||||
Page int `form:"page,default=1" binding:"numeric,gte=1"`
|
||||
Order string `form:"order,default=desc" binding:"oneof=asc desc"`
|
||||
OrderBy string `form:"orderBy,default=updated_at" binding:"oneof=id label type created_at updated_at"`
|
||||
Order string `form:"order,default=asc" binding:"oneof=asc desc"`
|
||||
OrderBy string `form:"orderBy,default=label" binding:"oneof=id label type created_at updated_at"`
|
||||
}
|
||||
|
||||
type PaginateActionInvocationRequest struct {
|
||||
PageSize int `form:"pageSize,default=5" binding:"numeric,gte=1"`
|
||||
Page int `form:"page,default=1" binding:"numeric,gte=1"`
|
||||
Order string `form:"order,default=desc" binding:"oneof=asc desc"`
|
||||
OrderBy string `form:"orderBy,default=updated_at" binding:"oneof=id state retry_count created_at updated_at"`
|
||||
OrderBy string `form:"orderBy,default=created_at" binding:"oneof=id state retry_count created_at updated_at"`
|
||||
}
|
||||
|
||||
type WebhookGenericRequest struct {
|
||||
|
@ -390,10 +390,10 @@ type ActionResponse struct {
|
|||
ID uuid.UUID `json:"id"`
|
||||
Label string `json:"label"`
|
||||
Type string `json:"type"`
|
||||
MatchEvent *string `json:"matchEvent"`
|
||||
MatchHost *string `json:"matchHost"`
|
||||
MatchApplication *string `json:"matchApplication"`
|
||||
MatchProvider *string `json:"matchProvider"`
|
||||
MatchEvent *string `json:"matchEvent,omitempty"`
|
||||
MatchHost *string `json:"matchHost,omitempty"`
|
||||
MatchApplication *string `json:"matchApplication,omitempty"`
|
||||
MatchProvider *string `json:"matchProvider,omitempty"`
|
||||
Payload interface{} `json:"payload,omitempty"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
|
@ -460,6 +460,7 @@ type ActionInvocationResponse struct {
|
|||
ID uuid.UUID `json:"id"`
|
||||
RetryCount int `json:"retryCount"`
|
||||
State string `json:"state"`
|
||||
Message *string `json:"message,omitempty"`
|
||||
ActionID string `json:"actionId"`
|
||||
EventID string `json:"eventId"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
|
@ -470,11 +471,12 @@ type ActionInvocationSingleResponse struct {
|
|||
Data ActionInvocationResponse `json:"data"`
|
||||
}
|
||||
|
||||
func NewActionInvocationSingleResponse(id uuid.UUID, retryCount int, state string, actionId string, eventId string, createdAt time.Time, updatedAt time.Time) *ActionInvocationSingleResponse {
|
||||
func NewActionInvocationSingleResponse(id uuid.UUID, retryCount int, state string, message *string, actionId string, eventId string, createdAt time.Time, updatedAt time.Time) *ActionInvocationSingleResponse {
|
||||
e := new(ActionInvocationSingleResponse)
|
||||
e.Data.ID = id
|
||||
e.Data.RetryCount = retryCount
|
||||
e.Data.State = state
|
||||
e.Data.Message = message
|
||||
e.Data.ActionID = actionId
|
||||
e.Data.EventID = eventId
|
||||
e.Data.CreatedAt = createdAt
|
||||
|
|
2
go.mod
2
go.mod
|
@ -1,6 +1,6 @@
|
|||
module git.myservermanager.com/varakh/upda
|
||||
|
||||
go 1.21
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
github.com/Depado/ginprom v1.8.1
|
||||
|
|
|
@ -63,6 +63,7 @@ func (h *actionInvocationHandler) paginate(c *gin.Context) {
|
|||
ID: e.ID,
|
||||
RetryCount: e.RetryCount,
|
||||
State: e.State,
|
||||
Message: e.Message,
|
||||
ActionID: e.ActionID,
|
||||
EventID: e.EventID,
|
||||
CreatedAt: e.CreatedAt,
|
||||
|
@ -87,7 +88,7 @@ func (h *actionInvocationHandler) get(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, api.NewActionInvocationSingleResponse(e.ID, e.RetryCount, e.State, e.ActionID, e.EventID, e.CreatedAt, e.UpdatedAt))
|
||||
c.JSON(http.StatusOK, api.NewActionInvocationSingleResponse(e.ID, e.RetryCount, e.State, e.Message, e.ActionID, e.EventID, e.CreatedAt, e.UpdatedAt))
|
||||
}
|
||||
|
||||
func (h *actionInvocationHandler) delete(c *gin.Context) {
|
||||
|
|
|
@ -167,6 +167,7 @@ type ActionInvocation struct {
|
|||
ID uuid.UUID `gorm:"type:uuid;primary_key;unique;not null"`
|
||||
RetryCount int `gorm:"not null;default:1"`
|
||||
State string `gorm:"not null"`
|
||||
Message *string
|
||||
Event Event `gorm:"foreignKey:EventID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`
|
||||
EventID string `gorm:"not null"`
|
||||
Action Action `gorm:"foreignKey:ActionID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`
|
||||
|
|
|
@ -13,6 +13,7 @@ type ActionInvocationRepository interface {
|
|||
findAllByState(limit int, maxRetries int, state ...api.ActionInvocationState) ([]*ActionInvocation, error)
|
||||
create(eventId string, actionId string, state api.ActionInvocationState) (*ActionInvocation, error)
|
||||
updateState(id string, state api.ActionInvocationState) (*ActionInvocation, error)
|
||||
updateMessage(id string, message *string) (*ActionInvocation, error)
|
||||
updateRetryCount(id string, retryCount int) (*ActionInvocation, error)
|
||||
delete(id string) (int64, error)
|
||||
deleteByUpdatedAtBeforeAndStates(time time.Time, retryCount int, state ...api.ActionInvocationState) (int64, error)
|
||||
|
@ -117,6 +118,31 @@ func (r *actionInvocationDbRepo) updateState(id string, state api.ActionInvocati
|
|||
return e, nil
|
||||
}
|
||||
|
||||
func (r *actionInvocationDbRepo) updateMessage(id string, message *string) (*ActionInvocation, error) {
|
||||
if id == "" {
|
||||
return nil, errorValidationNotBlank
|
||||
}
|
||||
|
||||
var err error
|
||||
var e *ActionInvocation
|
||||
|
||||
if e, err = r.find(id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.Message = message
|
||||
|
||||
var res *gorm.DB
|
||||
if res = r.db.Save(&e); res.Error != nil {
|
||||
return nil, res.Error
|
||||
}
|
||||
if res.RowsAffected == 0 {
|
||||
return e, errorDatabaseRowsExpected
|
||||
}
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func (r *actionInvocationDbRepo) delete(id string) (int64, error) {
|
||||
if id == "" {
|
||||
return 0, errorValidationNotBlank
|
||||
|
|
|
@ -79,7 +79,6 @@ func (s *actionInvocationService) enqueueFromEvent(event *Event, actions []*Acti
|
|||
|
||||
if len(filteredActions) == 0 {
|
||||
zap.L().Sugar().Debugf("No actions found which match event '%s', nothing to enqueue", event.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, action := range filteredActions {
|
||||
|
@ -108,7 +107,7 @@ func (s *actionInvocationService) invoke(batchSize int, maxRetries int) error {
|
|||
var err error
|
||||
var actionInvocations []*ActionInvocation
|
||||
|
||||
if actionInvocations, err = s.getByState(batchSize, maxRetries, api.ActionInvocationStateCreated, api.ActionInvocationStateError); err != nil {
|
||||
if actionInvocations, err = s.getByState(batchSize, maxRetries, api.ActionInvocationStateCreated, api.ActionInvocationStateRetrying); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -147,20 +146,31 @@ func (s *actionInvocationService) invoke(batchSize int, maxRetries int) error {
|
|||
}
|
||||
|
||||
if err = s.execute(action, eventPayload); err != nil {
|
||||
var cause error
|
||||
cause = err
|
||||
|
||||
zap.L().Sugar().Errorf("Could not invoke action '%s' (%v) for action invocation '%v'. Reason: %s", action.Label, action.ID, actionInvocation.ID, err.Error())
|
||||
|
||||
if _, err = s.updateState(actionInvocation.ID.String(), api.ActionInvocationStateError); err != nil {
|
||||
zap.L().Sugar().Errorf("Could not mark action invocation '%v' as error. Reason: %s", actionInvocation.ID, err.Error())
|
||||
var newState api.ActionInvocationState
|
||||
newRetryCount := actionInvocation.RetryCount + 1
|
||||
newState = api.ActionInvocationStateRetrying
|
||||
|
||||
if newRetryCount >= maxRetries {
|
||||
zap.L().Sugar().Infof("Action invocation '%v' exceeded max retry count of '%d'. Not trying again.", actionInvocation.ID, newRetryCount)
|
||||
newState = api.ActionInvocationStateError
|
||||
}
|
||||
|
||||
newRetryCount := actionInvocation.RetryCount + 1
|
||||
if _, err = s.updateState(actionInvocation.ID.String(), newState); err != nil {
|
||||
zap.L().Sugar().Errorf("Could not mark action invocation '%v' as '%v'. Reason: %s", actionInvocation.ID, newState, err.Error())
|
||||
}
|
||||
|
||||
if _, err = s.updateRetryCount(actionInvocation.ID.String(), newRetryCount); err != nil {
|
||||
zap.L().Sugar().Errorf("Could not update action invocation '%v' retry count to '%d'. Reason: %s", actionInvocation.ID, newRetryCount, err.Error())
|
||||
}
|
||||
|
||||
if newRetryCount >= maxRetries {
|
||||
zap.L().Sugar().Infof("Action invocation '%v' exceeded max retry count of '%d'. Not trying again.", actionInvocation.ID, newRetryCount)
|
||||
msg := cause.Error()
|
||||
if _, err = s.updateMessage(actionInvocation.ID.String(), &msg); err != nil {
|
||||
zap.L().Sugar().Errorf("Could not update action invocation '%v' message. Reason: %s", actionInvocation.ID, err.Error())
|
||||
}
|
||||
|
||||
continue
|
||||
|
@ -170,6 +180,9 @@ func (s *actionInvocationService) invoke(batchSize int, maxRetries int) error {
|
|||
if _, err = s.updateState(actionInvocation.ID.String(), api.ActionInvocationStateSuccess); err != nil {
|
||||
zap.L().Sugar().Errorf("Could not mark action invocation '%v' as success. Reason: %s", actionInvocation.ID, err.Error())
|
||||
}
|
||||
if _, err = s.updateMessage(actionInvocation.ID.String(), nil); err != nil {
|
||||
zap.L().Sugar().Errorf("Could not update action invocation '%v' message. Reason: %s", actionInvocation.ID, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -322,6 +335,26 @@ func (s *actionInvocationService) updateState(id string, state api.ActionInvocat
|
|||
return e, nil
|
||||
}
|
||||
|
||||
func (s *actionInvocationService) updateMessage(id string, message *string) (*ActionInvocation, error) {
|
||||
if id == "" {
|
||||
return nil, errorValidationNotBlank
|
||||
}
|
||||
|
||||
var e *ActionInvocation
|
||||
var err error
|
||||
|
||||
if e, err = s.get(id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if e, err = s.repo.updateMessage(id, message); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
zap.L().Sugar().Infof("Modified action invocation '%v'", id)
|
||||
return e, nil
|
||||
}
|
||||
|
||||
func (s *actionInvocationService) updateRetryCount(id string, retryCount int) (*ActionInvocation, error) {
|
||||
if id == "" {
|
||||
return nil, errorValidationNotBlank
|
||||
|
|
|
@ -218,8 +218,23 @@ func (s *eventService) extractPayloadInfo(event *Event) (*eventPayloadInformatio
|
|||
}
|
||||
return &eventPayloadInformationDto{Host: p.Host, Application: p.Application, Provider: p.Provider, Version: p.Version}, nil
|
||||
case api.EventNameUpdateUpdatedApproved.Value():
|
||||
var p api.EventPayloadUpdateUpdatedDto
|
||||
if p, err = util.UnmarshalGenericJSON[api.EventPayloadUpdateUpdatedDto](bytes); err != nil {
|
||||
return nil, newServiceError(General, err)
|
||||
}
|
||||
return &eventPayloadInformationDto{Host: p.Host, Application: p.Application, Provider: p.Provider, Version: p.Version}, nil
|
||||
case api.EventNameUpdateUpdatedPending.Value():
|
||||
var p api.EventPayloadUpdateUpdatedDto
|
||||
if p, err = util.UnmarshalGenericJSON[api.EventPayloadUpdateUpdatedDto](bytes); err != nil {
|
||||
return nil, newServiceError(General, err)
|
||||
}
|
||||
return &eventPayloadInformationDto{Host: p.Host, Application: p.Application, Provider: p.Provider, Version: p.Version}, nil
|
||||
case api.EventNameUpdateUpdatedIgnored.Value():
|
||||
var p api.EventPayloadUpdateUpdatedDto
|
||||
if p, err = util.UnmarshalGenericJSON[api.EventPayloadUpdateUpdatedDto](bytes); err != nil {
|
||||
return nil, newServiceError(General, err)
|
||||
}
|
||||
return &eventPayloadInformationDto{Host: p.Host, Application: p.Application, Provider: p.Provider, Version: p.Version}, nil
|
||||
case api.EventNameUpdateUpdated.Value():
|
||||
var p api.EventPayloadUpdateUpdatedDto
|
||||
if p, err = util.UnmarshalGenericJSON[api.EventPayloadUpdateUpdatedDto](bytes); err != nil {
|
||||
|
|
Loading…
Reference in a new issue