feat(actions): Add basic functionality for actions and secrets (with proper asynchronous enqueue and dequeue mechanism)
All checks were successful
/ build (pull_request) Successful in 3m16s
All checks were successful
/ build (pull_request) Successful in 3m16s
This commit is contained in:
parent
82fa877b6f
commit
2076a7b76f
6 changed files with 70 additions and 6 deletions
|
@ -2499,6 +2499,9 @@ components:
|
||||||
- running
|
- running
|
||||||
- error
|
- error
|
||||||
- success
|
- success
|
||||||
|
message:
|
||||||
|
type: string
|
||||||
|
nullable: true
|
||||||
actionId:
|
actionId:
|
||||||
type: string
|
type: string
|
||||||
eventId:
|
eventId:
|
||||||
|
|
12
api/dto.go
12
api/dto.go
|
@ -390,10 +390,10 @@ type ActionResponse struct {
|
||||||
ID uuid.UUID `json:"id"`
|
ID uuid.UUID `json:"id"`
|
||||||
Label string `json:"label"`
|
Label string `json:"label"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
MatchEvent *string `json:"matchEvent"`
|
MatchEvent *string `json:"matchEvent,omitempty"`
|
||||||
MatchHost *string `json:"matchHost"`
|
MatchHost *string `json:"matchHost,omitempty"`
|
||||||
MatchApplication *string `json:"matchApplication"`
|
MatchApplication *string `json:"matchApplication,omitempty"`
|
||||||
MatchProvider *string `json:"matchProvider"`
|
MatchProvider *string `json:"matchProvider,omitempty"`
|
||||||
Payload interface{} `json:"payload,omitempty"`
|
Payload interface{} `json:"payload,omitempty"`
|
||||||
CreatedAt time.Time `json:"createdAt"`
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
UpdatedAt time.Time `json:"updatedAt"`
|
UpdatedAt time.Time `json:"updatedAt"`
|
||||||
|
@ -460,6 +460,7 @@ type ActionInvocationResponse struct {
|
||||||
ID uuid.UUID `json:"id"`
|
ID uuid.UUID `json:"id"`
|
||||||
RetryCount int `json:"retryCount"`
|
RetryCount int `json:"retryCount"`
|
||||||
State string `json:"state"`
|
State string `json:"state"`
|
||||||
|
Message *string `json:"message,omitempty"`
|
||||||
ActionID string `json:"actionId"`
|
ActionID string `json:"actionId"`
|
||||||
EventID string `json:"eventId"`
|
EventID string `json:"eventId"`
|
||||||
CreatedAt time.Time `json:"createdAt"`
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
@ -470,11 +471,12 @@ type ActionInvocationSingleResponse struct {
|
||||||
Data ActionInvocationResponse `json:"data"`
|
Data ActionInvocationResponse `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewActionInvocationSingleResponse(id uuid.UUID, retryCount int, state string, actionId string, eventId string, createdAt time.Time, updatedAt time.Time) *ActionInvocationSingleResponse {
|
func NewActionInvocationSingleResponse(id uuid.UUID, retryCount int, state string, message *string, actionId string, eventId string, createdAt time.Time, updatedAt time.Time) *ActionInvocationSingleResponse {
|
||||||
e := new(ActionInvocationSingleResponse)
|
e := new(ActionInvocationSingleResponse)
|
||||||
e.Data.ID = id
|
e.Data.ID = id
|
||||||
e.Data.RetryCount = retryCount
|
e.Data.RetryCount = retryCount
|
||||||
e.Data.State = state
|
e.Data.State = state
|
||||||
|
e.Data.Message = message
|
||||||
e.Data.ActionID = actionId
|
e.Data.ActionID = actionId
|
||||||
e.Data.EventID = eventId
|
e.Data.EventID = eventId
|
||||||
e.Data.CreatedAt = createdAt
|
e.Data.CreatedAt = createdAt
|
||||||
|
|
|
@ -63,6 +63,7 @@ func (h *actionInvocationHandler) paginate(c *gin.Context) {
|
||||||
ID: e.ID,
|
ID: e.ID,
|
||||||
RetryCount: e.RetryCount,
|
RetryCount: e.RetryCount,
|
||||||
State: e.State,
|
State: e.State,
|
||||||
|
Message: e.Message,
|
||||||
ActionID: e.ActionID,
|
ActionID: e.ActionID,
|
||||||
EventID: e.EventID,
|
EventID: e.EventID,
|
||||||
CreatedAt: e.CreatedAt,
|
CreatedAt: e.CreatedAt,
|
||||||
|
@ -87,7 +88,7 @@ func (h *actionInvocationHandler) get(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.JSON(http.StatusOK, api.NewActionInvocationSingleResponse(e.ID, e.RetryCount, e.State, e.ActionID, e.EventID, e.CreatedAt, e.UpdatedAt))
|
c.JSON(http.StatusOK, api.NewActionInvocationSingleResponse(e.ID, e.RetryCount, e.State, e.Message, e.ActionID, e.EventID, e.CreatedAt, e.UpdatedAt))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *actionInvocationHandler) delete(c *gin.Context) {
|
func (h *actionInvocationHandler) delete(c *gin.Context) {
|
||||||
|
|
|
@ -167,6 +167,7 @@ type ActionInvocation struct {
|
||||||
ID uuid.UUID `gorm:"type:uuid;primary_key;unique;not null"`
|
ID uuid.UUID `gorm:"type:uuid;primary_key;unique;not null"`
|
||||||
RetryCount int `gorm:"not null;default:1"`
|
RetryCount int `gorm:"not null;default:1"`
|
||||||
State string `gorm:"not null"`
|
State string `gorm:"not null"`
|
||||||
|
Message *string
|
||||||
Event Event `gorm:"foreignKey:EventID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`
|
Event Event `gorm:"foreignKey:EventID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`
|
||||||
EventID string `gorm:"not null"`
|
EventID string `gorm:"not null"`
|
||||||
Action Action `gorm:"foreignKey:ActionID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`
|
Action Action `gorm:"foreignKey:ActionID;references:ID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE;"`
|
||||||
|
|
|
@ -13,6 +13,7 @@ type ActionInvocationRepository interface {
|
||||||
findAllByState(limit int, maxRetries int, state ...api.ActionInvocationState) ([]*ActionInvocation, error)
|
findAllByState(limit int, maxRetries int, state ...api.ActionInvocationState) ([]*ActionInvocation, error)
|
||||||
create(eventId string, actionId string, state api.ActionInvocationState) (*ActionInvocation, error)
|
create(eventId string, actionId string, state api.ActionInvocationState) (*ActionInvocation, error)
|
||||||
updateState(id string, state api.ActionInvocationState) (*ActionInvocation, error)
|
updateState(id string, state api.ActionInvocationState) (*ActionInvocation, error)
|
||||||
|
updateMessage(id string, message *string) (*ActionInvocation, error)
|
||||||
updateRetryCount(id string, retryCount int) (*ActionInvocation, error)
|
updateRetryCount(id string, retryCount int) (*ActionInvocation, error)
|
||||||
delete(id string) (int64, error)
|
delete(id string) (int64, error)
|
||||||
deleteByUpdatedAtBeforeAndStates(time time.Time, retryCount int, state ...api.ActionInvocationState) (int64, error)
|
deleteByUpdatedAtBeforeAndStates(time time.Time, retryCount int, state ...api.ActionInvocationState) (int64, error)
|
||||||
|
@ -117,6 +118,31 @@ func (r *actionInvocationDbRepo) updateState(id string, state api.ActionInvocati
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *actionInvocationDbRepo) updateMessage(id string, message *string) (*ActionInvocation, error) {
|
||||||
|
if id == "" {
|
||||||
|
return nil, errorValidationNotBlank
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var e *ActionInvocation
|
||||||
|
|
||||||
|
if e, err = r.find(id); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
e.Message = message
|
||||||
|
|
||||||
|
var res *gorm.DB
|
||||||
|
if res = r.db.Save(&e); res.Error != nil {
|
||||||
|
return nil, res.Error
|
||||||
|
}
|
||||||
|
if res.RowsAffected == 0 {
|
||||||
|
return e, errorDatabaseRowsExpected
|
||||||
|
}
|
||||||
|
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *actionInvocationDbRepo) delete(id string) (int64, error) {
|
func (r *actionInvocationDbRepo) delete(id string) (int64, error) {
|
||||||
if id == "" {
|
if id == "" {
|
||||||
return 0, errorValidationNotBlank
|
return 0, errorValidationNotBlank
|
||||||
|
|
|
@ -147,12 +147,20 @@ func (s *actionInvocationService) invoke(batchSize int, maxRetries int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = s.execute(action, eventPayload); err != nil {
|
if err = s.execute(action, eventPayload); err != nil {
|
||||||
|
var cause error
|
||||||
|
cause = err
|
||||||
|
|
||||||
zap.L().Sugar().Errorf("Could not invoke action '%s' (%v) for action invocation '%v'. Reason: %s", action.Label, action.ID, actionInvocation.ID, err.Error())
|
zap.L().Sugar().Errorf("Could not invoke action '%s' (%v) for action invocation '%v'. Reason: %s", action.Label, action.ID, actionInvocation.ID, err.Error())
|
||||||
|
|
||||||
if _, err = s.updateState(actionInvocation.ID.String(), api.ActionInvocationStateError); err != nil {
|
if _, err = s.updateState(actionInvocation.ID.String(), api.ActionInvocationStateError); err != nil {
|
||||||
zap.L().Sugar().Errorf("Could not mark action invocation '%v' as error. Reason: %s", actionInvocation.ID, err.Error())
|
zap.L().Sugar().Errorf("Could not mark action invocation '%v' as error. Reason: %s", actionInvocation.ID, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msg := cause.Error()
|
||||||
|
if _, err = s.updateMessage(actionInvocation.ID.String(), &msg); err != nil {
|
||||||
|
zap.L().Sugar().Errorf("Could not update action invocation '%v' message. Reason: %s", actionInvocation.ID, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
newRetryCount := actionInvocation.RetryCount + 1
|
newRetryCount := actionInvocation.RetryCount + 1
|
||||||
|
|
||||||
if _, err = s.updateRetryCount(actionInvocation.ID.String(), newRetryCount); err != nil {
|
if _, err = s.updateRetryCount(actionInvocation.ID.String(), newRetryCount); err != nil {
|
||||||
|
@ -170,6 +178,9 @@ func (s *actionInvocationService) invoke(batchSize int, maxRetries int) error {
|
||||||
if _, err = s.updateState(actionInvocation.ID.String(), api.ActionInvocationStateSuccess); err != nil {
|
if _, err = s.updateState(actionInvocation.ID.String(), api.ActionInvocationStateSuccess); err != nil {
|
||||||
zap.L().Sugar().Errorf("Could not mark action invocation '%v' as success. Reason: %s", actionInvocation.ID, err.Error())
|
zap.L().Sugar().Errorf("Could not mark action invocation '%v' as success. Reason: %s", actionInvocation.ID, err.Error())
|
||||||
}
|
}
|
||||||
|
if _, err = s.updateMessage(actionInvocation.ID.String(), nil); err != nil {
|
||||||
|
zap.L().Sugar().Errorf("Could not update action invocation '%v' message. Reason: %s", actionInvocation.ID, err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -322,6 +333,26 @@ func (s *actionInvocationService) updateState(id string, state api.ActionInvocat
|
||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *actionInvocationService) updateMessage(id string, message *string) (*ActionInvocation, error) {
|
||||||
|
if id == "" {
|
||||||
|
return nil, errorValidationNotBlank
|
||||||
|
}
|
||||||
|
|
||||||
|
var e *ActionInvocation
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if e, err = s.get(id); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, err = s.repo.updateMessage(id, message); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Sugar().Infof("Modified action invocation '%v'", id)
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *actionInvocationService) updateRetryCount(id string, retryCount int) (*ActionInvocation, error) {
|
func (s *actionInvocationService) updateRetryCount(id string, retryCount int) (*ActionInvocation, error) {
|
||||||
if id == "" {
|
if id == "" {
|
||||||
return nil, errorValidationNotBlank
|
return nil, errorValidationNotBlank
|
||||||
|
|
Loading…
Reference in a new issue