265 lines
6.4 KiB
Go
265 lines
6.4 KiB
Go
package server
|
|
|
|
import (
|
|
"git.myservermanager.com/varakh/upda/api"
|
|
"gorm.io/gorm"
|
|
"time"
|
|
)
|
|
|
|
type ActionInvocationRepository interface {
|
|
paginate(page int, pageSize int, orderBy string, order string) ([]*ActionInvocation, error)
|
|
count() (int64, error)
|
|
find(id string) (*ActionInvocation, error)
|
|
findAllByState(limit int, maxRetries int, state ...api.ActionInvocationState) ([]*ActionInvocation, error)
|
|
create(eventId string, actionId string, state api.ActionInvocationState) (*ActionInvocation, error)
|
|
updateState(id string, state api.ActionInvocationState) (*ActionInvocation, error)
|
|
updateMessage(id string, message *string) (*ActionInvocation, error)
|
|
updateRetryCount(id string, retryCount int) (*ActionInvocation, error)
|
|
delete(id string) (int64, error)
|
|
deleteByUpdatedAtBeforeAndStates(time time.Time, retryCount int, state ...api.ActionInvocationState) (int64, error)
|
|
}
|
|
|
|
type actionInvocationDbRepo struct {
|
|
db *gorm.DB
|
|
}
|
|
|
|
func newActionInvocationDbRepo(db *gorm.DB) *actionInvocationDbRepo {
|
|
return &actionInvocationDbRepo{
|
|
db: db,
|
|
}
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) find(id string) (*ActionInvocation, error) {
|
|
if id == "" {
|
|
return nil, errorValidationNotBlank
|
|
}
|
|
|
|
var e ActionInvocation
|
|
var res *gorm.DB
|
|
if res = r.db.Find(&e, "id = ?", id); res.Error != nil {
|
|
return nil, newServiceDatabaseError(res.Error)
|
|
}
|
|
if res.RowsAffected == 0 {
|
|
return nil, errorResourceNotFound
|
|
}
|
|
|
|
return &e, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) create(eventId string, actionId string, state api.ActionInvocationState) (*ActionInvocation, error) {
|
|
if eventId == "" || actionId == "" || state == "" {
|
|
return nil, errorValidationNotBlank
|
|
}
|
|
|
|
e := &ActionInvocation{
|
|
EventID: eventId,
|
|
ActionID: actionId,
|
|
State: state.Value(),
|
|
}
|
|
|
|
var res *gorm.DB
|
|
if res = r.db.Create(&e); res.Error != nil {
|
|
return nil, newServiceDatabaseError(res.Error)
|
|
}
|
|
if res.RowsAffected == 0 {
|
|
return nil, errorDatabaseRowsExpected
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) updateRetryCount(id string, retryCount int) (*ActionInvocation, error) {
|
|
if id == "" {
|
|
return nil, errorValidationNotBlank
|
|
}
|
|
|
|
var err error
|
|
var e *ActionInvocation
|
|
|
|
if e, err = r.find(id); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
e.RetryCount = retryCount
|
|
|
|
var res *gorm.DB
|
|
if res = r.db.Save(&e); res.Error != nil {
|
|
return nil, res.Error
|
|
}
|
|
if res.RowsAffected == 0 {
|
|
return e, errorDatabaseRowsExpected
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) updateState(id string, state api.ActionInvocationState) (*ActionInvocation, error) {
|
|
if id == "" || state == "" {
|
|
return nil, errorValidationNotBlank
|
|
}
|
|
|
|
var err error
|
|
var e *ActionInvocation
|
|
|
|
if e, err = r.find(id); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
e.State = state.Value()
|
|
|
|
var res *gorm.DB
|
|
if res = r.db.Save(&e); res.Error != nil {
|
|
return nil, res.Error
|
|
}
|
|
if res.RowsAffected == 0 {
|
|
return e, errorDatabaseRowsExpected
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) updateMessage(id string, message *string) (*ActionInvocation, error) {
|
|
if id == "" {
|
|
return nil, errorValidationNotBlank
|
|
}
|
|
|
|
var err error
|
|
var e *ActionInvocation
|
|
|
|
if e, err = r.find(id); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
e.Message = message
|
|
|
|
var res *gorm.DB
|
|
if res = r.db.Save(&e); res.Error != nil {
|
|
return nil, res.Error
|
|
}
|
|
if res.RowsAffected == 0 {
|
|
return e, errorDatabaseRowsExpected
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) delete(id string) (int64, error) {
|
|
if id == "" {
|
|
return 0, errorValidationNotBlank
|
|
}
|
|
|
|
var res *gorm.DB
|
|
if res = r.db.Delete(&ActionInvocation{}, "id = ?", id); res.Error != nil {
|
|
return 0, newServiceDatabaseError(res.Error)
|
|
}
|
|
|
|
return res.RowsAffected, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) paginate(page int, pageSize int, orderBy string, order string) ([]*ActionInvocation, error) {
|
|
if page == 0 {
|
|
return nil, errorValidationPageGreaterZero
|
|
}
|
|
if pageSize <= 0 {
|
|
return nil, errorValidationPageSizeGreaterZero
|
|
}
|
|
|
|
offset := (page - 1) * pageSize
|
|
|
|
var e []*ActionInvocation
|
|
var res *gorm.DB
|
|
|
|
if orderBy != "" && order != "" {
|
|
res = r.db.Order(orderBy + " " + order).Offset(offset).Limit(pageSize).Find(&e)
|
|
} else {
|
|
res = r.db.Offset(offset).Limit(pageSize).Find(&e)
|
|
}
|
|
|
|
if res.Error != nil {
|
|
return nil, newServiceDatabaseError(res.Error)
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) count() (int64, error) {
|
|
var c int64
|
|
var res *gorm.DB
|
|
|
|
if res = r.db.Model(&ActionInvocation{}).Count(&c); res.Error != nil {
|
|
return 0, newServiceDatabaseError(res.Error)
|
|
}
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) findAllByState(limit int, maxRetries int, state ...api.ActionInvocationState) ([]*ActionInvocation, error) {
|
|
if limit <= 0 {
|
|
return nil, errorValidationLimitGreaterZero
|
|
}
|
|
|
|
var e []*ActionInvocation
|
|
|
|
states := translateActionInvocationState(state...)
|
|
|
|
if res := r.db.Model(&ActionInvocation{}).Scopes(allGetActionInvocationCriterion(states, maxRetries)).Order("created_at asc").Limit(limit).Find(&e); res.Error != nil {
|
|
return nil, newServiceDatabaseError(res.Error)
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
func (r *actionInvocationDbRepo) deleteByUpdatedAtBeforeAndStates(time time.Time, maxRetries int, state ...api.ActionInvocationState) (int64, error) {
|
|
if len(state) == 0 {
|
|
return 0, errorValidationNotEmpty
|
|
}
|
|
|
|
states := translateActionInvocationState(state...)
|
|
|
|
var res *gorm.DB
|
|
if res = r.db.Where("retry_count >= ?", maxRetries).Where("state IN ?", states).Where("updated_at < ?", time).Delete(&ActionInvocation{}); res.Error != nil {
|
|
return 0, newServiceDatabaseError(res.Error)
|
|
}
|
|
|
|
return res.RowsAffected, nil
|
|
}
|
|
|
|
func translateActionInvocationState(state ...api.ActionInvocationState) []string {
|
|
states := make([]string, 0)
|
|
if len(state) > 0 {
|
|
for _, s := range state {
|
|
states = append(states, s.Value())
|
|
}
|
|
}
|
|
|
|
return states
|
|
}
|
|
|
|
func criterionActonInvocationMaxRetries(maxRetries int) func(db *gorm.DB) *gorm.DB {
|
|
if maxRetries > 0 {
|
|
return func(db *gorm.DB) *gorm.DB {
|
|
return db.Where("retry_count < ? ", maxRetries)
|
|
}
|
|
}
|
|
|
|
return func(db *gorm.DB) *gorm.DB {
|
|
return db
|
|
}
|
|
}
|
|
|
|
func criterionActionInvocationState(states []string) func(db *gorm.DB) *gorm.DB {
|
|
if states != nil && len(states) > 0 {
|
|
return func(db *gorm.DB) *gorm.DB {
|
|
return db.Where("state IN (?)", states)
|
|
}
|
|
}
|
|
return func(db *gorm.DB) *gorm.DB {
|
|
return db
|
|
}
|
|
}
|
|
|
|
func allGetActionInvocationCriterion(states []string, maxRetries int) func(db *gorm.DB) *gorm.DB {
|
|
return func(db *gorm.DB) *gorm.DB {
|
|
return db.Scopes(criterionActionInvocationState(states), criterionActonInvocationMaxRetries(maxRetries))
|
|
}
|
|
}
|