From 24e85df7e5e79e66f06c4ab930140b44ada48e22 Mon Sep 17 00:00:00 2001 From: Shawn Hurley Date: Wed, 3 Jul 2024 10:44:40 -0400 Subject: [PATCH] Revert ":ghost: Tasking find refs (#673)" This reverts commit 9b4cc38f5b4a39ab8f88e4df99e695b37f593683. --- api/application.go | 50 +++++---- api/base.go | 41 +++++++ api/identity.go | 20 ++-- api/task.go | 93 +++++++++++++++- api/taskgroup.go | 22 ++-- cmd/main.go | 4 + importer/manager.go | 37 +++++-- k8s/api/tackle/v1alpha2/task.go | 12 -- migration/json/fields.go | 86 -------------- migration/json/pkg.go | 6 - migration/v14/model/core.go | 63 ++++++----- model/pkg.go | 7 +- settings/hub.go | 76 ++++++++++--- task/error.go | 14 +++ task/manager.go | 191 +++++++++++++------------------- test/api/task/api_test.go | 14 +-- trigger/application.go | 42 ------- trigger/identity.go | 28 ----- trigger/pkg.go | 53 --------- 19 files changed, 401 insertions(+), 458 deletions(-) delete mode 100644 migration/json/fields.go delete mode 100644 migration/json/pkg.go delete mode 100644 trigger/application.go delete mode 100644 trigger/identity.go delete mode 100644 trigger/pkg.go diff --git a/api/application.go b/api/application.go index 4384a9c8..b2f1fd57 100644 --- a/api/application.go +++ b/api/application.go @@ -2,6 +2,7 @@ package api import ( "encoding/json" + "fmt" "net/http" "sort" "strings" @@ -10,7 +11,7 @@ import ( "github.com/konveyor/tackle2-hub/assessment" "github.com/konveyor/tackle2-hub/metrics" "github.com/konveyor/tackle2-hub/model" - "github.com/konveyor/tackle2-hub/trigger" + tasking "github.com/konveyor/tackle2-hub/task" "gorm.io/gorm/clause" ) @@ -249,20 +250,11 @@ func (h ApplicationHandler) Create(ctx *gin.Context) { return } - rtx := WithContext(ctx) - tr := trigger.Application{ - Trigger: trigger.Trigger{ - TaskManager: rtx.TaskManager, - Client: rtx.Client, - DB: h.DB(ctx), - }, - } - err = tr.Created(m) + err = h.discover(ctx, m) if err != nil { _ = ctx.Error(err) return } - h.Respond(ctx, http.StatusCreated, r) } @@ -388,20 +380,11 @@ func (h ApplicationHandler) Update(ctx *gin.Context) { } } - rtx := WithContext(ctx) - tr := trigger.Application{ - Trigger: trigger.Trigger{ - TaskManager: rtx.TaskManager, - Client: rtx.Client, - DB: h.DB(ctx), - }, - } - err = tr.Updated(m) + err = h.discover(ctx, m) if err != nil { _ = ctx.Error(err) return } - h.Status(ctx, http.StatusNoContent) } @@ -1091,6 +1074,31 @@ func (h ApplicationHandler) AssessmentCreate(ctx *gin.Context) { h.Respond(ctx, http.StatusCreated, r) } +// discover an application's language and frameworks by launching discovery tasks. +func (h ApplicationHandler) discover(ctx *gin.Context, application *model.Application) (err error) { + rtx := WithContext(ctx) + db := h.DB(ctx) + for _, kind := range Settings.Hub.Discovery.Tasks { + t := Task{} + t.Kind = kind + t.Name = fmt.Sprintf("%s-%s", application.Name, kind) + ref := Ref{ID: application.ID} + t.Application = &ref + t.State = tasking.Ready + taskHandler := TaskHandler{} + err = taskHandler.FindRefs(rtx.Client, &t) + if err != nil { + return + } + task := tasking.Task{Task: t.Model()} + err = rtx.TaskManager.Create(db, &task) + if err != nil { + return + } + } + return +} + // Application REST resource. type Application struct { Resource `yaml:",inline"` diff --git a/api/base.go b/api/base.go index bc19ce9a..186c894d 100644 --- a/api/base.go +++ b/api/base.go @@ -250,6 +250,47 @@ func (h *BaseHandler) Attachment(ctx *gin.Context, name string) { attachment) } +// Merge maps B into A. +// The B map is the authority. +func (h *BaseHandler) Merge(a, b map[string]any) (out map[string]any) { + if a == nil { + a = map[string]any{} + } + if b == nil { + b = map[string]any{} + } + out = map[string]any{} + for k, v := range a { + out[k] = v + if bv, found := b[k]; found { + out[k] = bv + if av, cast := v.(map[string]any); cast { + if bv, cast := bv.(map[string]any); cast { + out[k] = h.Merge(av, bv) + } else { + out[k] = bv + } + } + } + } + for k, v := range b { + if _, found := a[k]; !found { + out[k] = v + } + } + + return +} + +// AsMap returns the object as a map. +func (h *BaseHandler) AsMap(object any) (mp map[string]any, isMap bool) { + b, _ := json.Marshal(object) + mp = make(map[string]any) + err := json.Unmarshal(b, &mp) + isMap = err == nil + return +} + // REST resource. type Resource struct { ID uint `json:"id,omitempty" yaml:"id,omitempty"` diff --git a/api/identity.go b/api/identity.go index 40d5bd41..ee624acc 100644 --- a/api/identity.go +++ b/api/identity.go @@ -6,7 +6,6 @@ import ( "github.com/gin-gonic/gin" "github.com/konveyor/tackle2-hub/model" - "github.com/konveyor/tackle2-hub/trigger" "gorm.io/gorm/clause" ) @@ -208,18 +207,13 @@ func (h IdentityHandler) Update(ctx *gin.Context) { return } - rtx := WithContext(ctx) - tr := trigger.Identity{ - Trigger: trigger.Trigger{ - TaskManager: rtx.TaskManager, - Client: rtx.Client, - DB: h.DB(ctx), - }, - } - err = tr.Updated(m) - if err != nil { - _ = ctx.Error(err) - return + appHandler := ApplicationHandler{} + for i := range m.Applications { + err = appHandler.discover(ctx, &m.Applications[i]) + if err != nil { + _ = ctx.Error(err) + return + } } h.Status(ctx, http.StatusNoContent) diff --git a/api/task.go b/api/task.go index 04507705..4c0d53b2 100644 --- a/api/task.go +++ b/api/task.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "io/ioutil" "net/http" @@ -11,12 +12,15 @@ import ( "github.com/gin-gonic/gin" qf "github.com/konveyor/tackle2-hub/api/filter" + crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha2" "github.com/konveyor/tackle2-hub/model" "github.com/konveyor/tackle2-hub/tar" tasking "github.com/konveyor/tackle2-hub/task" "gorm.io/gorm" "gorm.io/gorm/clause" + k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/utils/strings/slices" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) // Routes @@ -289,6 +293,11 @@ func (h TaskHandler) Create(ctx *gin.Context) { return } rtx := WithContext(ctx) + err = h.FindRefs(rtx.Client, r) + if err != nil { + _ = ctx.Error(err) + return + } task := &tasking.Task{} task.With(r.Model()) task.CreateUser = h.BaseHandler.CurrentUser(ctx) @@ -327,7 +336,7 @@ func (h TaskHandler) Delete(ctx *gin.Context) { // @description Update a task. // @tags tasks // @accept json -// @success 200 +// @success 202 // @router /tasks/{id} [put] // @param id path int true "Task ID" // @param task body Task true "Task data" @@ -364,9 +373,7 @@ func (h TaskHandler) Update(ctx *gin.Context) { return } - r.With(m) - - h.Respond(ctx, http.StatusOK, r) + h.Status(ctx, http.StatusAccepted) } // Submit godoc @@ -374,7 +381,7 @@ func (h TaskHandler) Update(ctx *gin.Context) { // @description Patch and submit a task. // @tags tasks // @accept json -// @success 200 +// @success 202 // @router /tasks/{id}/submit [put] // @param id path int true "Task ID" // @param task body Task false "Task data (optional)" @@ -610,6 +617,82 @@ func (h TaskHandler) GetAttached(ctx *gin.Context) { } } +// FindRefs find referenced resources. +// - addon +// - extensions +// - kind +// - priority +// The priority is defaulted to the kind as needed. +func (h *TaskHandler) FindRefs(client k8sclient.Client, r *Task) (err error) { + if r.Addon != "" { + addon := &crd.Addon{} + name := r.Addon + err = client.Get( + context.TODO(), + k8sclient.ObjectKey{ + Name: name, + Namespace: Settings.Hub.Namespace, + }, + addon) + if err != nil { + if k8serr.IsNotFound(err) { + err = &BadRequestError{ + Reason: "Addon: " + name + " not found", + } + } + return + } + } + for _, name := range r.Extensions { + ext := &crd.Extension{} + err = client.Get( + context.TODO(), + k8sclient.ObjectKey{ + Name: name, + Namespace: Settings.Hub.Namespace, + }, + ext) + if err != nil { + if k8serr.IsNotFound(err) { + err = &BadRequestError{ + Reason: "Extension: " + name + " not found", + } + } + return + } + } + if r.Kind != "" { + kind := &crd.Task{} + name := r.Kind + err = client.Get( + context.TODO(), + k8sclient.ObjectKey{ + Name: name, + Namespace: Settings.Hub.Namespace, + }, + kind) + if err != nil { + if k8serr.IsNotFound(err) { + err = &BadRequestError{ + Reason: "Task: " + name + " not found", + } + } + return + } + if r.Priority == 0 { + r.Priority = kind.Spec.Priority + } + mA, castA := h.AsMap(kind.Spec.Data) + mB, castB := r.Data.(map[string]any) + if castA && castB { + r.Data = h.Merge(mA, mB) + } else { + r.Data = mA + } + } + return +} + // TTL time-to-live. type TTL model.TTL diff --git a/api/taskgroup.go b/api/taskgroup.go index 69bfc013..ff6f9ea2 100644 --- a/api/taskgroup.go +++ b/api/taskgroup.go @@ -450,11 +450,12 @@ func (h *TaskGroupHandler) findRefs(ctx *gin.Context, r *TaskGroup) (err error) if r.Priority == 0 { r.Priority = kind.Spec.Priority } - data := model.Data{Any: r.Data} - other := model.Data{Any: kind.Data()} - merged := data.Merge(other) - if !merged { - r.Data = other.Any + mA, castA := h.AsMap(kind.Spec.Data) + mB, castB := r.Data.(map[string]any) + if castA && castB { + r.Data = h.Merge(mA, mB) + } else { + r.Data = mA } } return @@ -471,9 +472,14 @@ func (h *TaskGroupHandler) Propagate(m *model.TaskGroup) (err error) { task.Policy = m.Policy task.State = m.State task.SetBucket(m.BucketID) - merged := task.Data.Merge(m.Data) - if !merged { - task.Data = m.Data + if m.Data.Any != nil { + mA, castA := m.Data.Any.(map[string]any) + mB, castB := task.Data.Any.(map[string]any) + if castA && castB { + task.Data.Any = h.Merge(mA, mB) + } else { + task.Data.Any = m.Data + } } } diff --git a/cmd/main.go b/cmd/main.go index 7ceff82c..0c3d3559 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -124,6 +124,10 @@ func main() { return } }() + err = Settings.FindDiscoveryTasks() + if err != nil { + return + } } // // k8s client. diff --git a/importer/manager.go b/importer/manager.go index b11f45e4..e8856751 100644 --- a/importer/manager.go +++ b/importer/manager.go @@ -14,7 +14,6 @@ import ( "github.com/konveyor/tackle2-hub/model" "github.com/konveyor/tackle2-hub/settings" tasking "github.com/konveyor/tackle2-hub/task" - "github.com/konveyor/tackle2-hub/trigger" "gorm.io/gorm" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -345,24 +344,38 @@ func (m *Manager) createApplication(imp *model.Import) (ok bool) { return } // best effort - tr := trigger.Application{ - Trigger: trigger.Trigger{ - TaskManager: m.TaskManager, - Client: m.Client, - DB: m.DB, - }, - } - err := tr.Created(app) + err := m.discover(app) if err != nil { - imp.ErrorMessage = fmt.Sprintf( - "Failed to launch discovery tasks for Application '%s'.", - app.Name) + imp.ErrorMessage = fmt.Sprintf("Failed to launch discovery tasks for Application '%s'", app.Name) + return } ok = true return } +func (m *Manager) discover(application *model.Application) (err error) { + for _, kind := range Settings.Hub.Discovery.Tasks { + t := api.Task{} + t.Kind = kind + t.Name = fmt.Sprintf("%s-%s", application.Name, kind) + ref := api.Ref{ID: application.ID} + t.Application = &ref + t.State = tasking.Ready + taskHandler := api.TaskHandler{} + err = taskHandler.FindRefs(m.Client, &t) + if err != nil { + return + } + task := tasking.Task{Task: t.Model()} + err = m.TaskManager.Create(m.DB, &task) + if err != nil { + return + } + } + return +} + func (m *Manager) createStakeholder(name string, email string) (stakeholder model.Stakeholder, err error) { stakeholder.Name = name stakeholder.Email = email diff --git a/k8s/api/tackle/v1alpha2/task.go b/k8s/api/tackle/v1alpha2/task.go index 2e5180dc..89261c9a 100644 --- a/k8s/api/tackle/v1alpha2/task.go +++ b/k8s/api/tackle/v1alpha2/task.go @@ -17,8 +17,6 @@ limitations under the License. package v1alpha2 import ( - "encoding/json" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -67,16 +65,6 @@ func (r *Task) HasDep(name string) (found bool) { return } -// Data returns the task Data as map[string]any. -func (r *Task) Data() (mp map[string]any) { - b := r.Spec.Data.Raw - if b == nil { - return - } - _ = json.Unmarshal(b, &mp) - return -} - // TaskList is a list of Task. // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type TaskList struct { diff --git a/migration/json/fields.go b/migration/json/fields.go deleted file mode 100644 index 926aa40b..00000000 --- a/migration/json/fields.go +++ /dev/null @@ -1,86 +0,0 @@ -package json - -import "gopkg.in/yaml.v2" - -// Ref represents a FK. -type Ref struct { - ID uint `json:"id" binding:"required"` - Name string `json:"name,omitempty" yaml:",omitempty"` -} - -// Map alias. -type Map = map[string]any - -// Any alias. -type Any any - -// Data json any field. -type Data struct { - Any -} - -// Merge merges the other into self. -// Both must be a map. -func (d *Data) Merge(other Data) (merged bool) { - b, isMap := d.AsMap() - if !isMap { - return - } - a, isMap := other.AsMap() - if !isMap { - return - } - d.Any = d.merge(a, b) - merged = true - return -} - -// Merge maps B into A. -// The B map takes precedence. -func (d *Data) merge(a, b map[any]any) (out map[any]any) { - if a == nil { - a = make(map[any]any) - } - if b == nil { - b = make(map[any]any) - } - out = make(map[any]any) - for k, v := range a { - out[k] = v - if bv, found := b[k]; found { - out[k] = bv - if av, cast := v.(map[any]any); cast { - if bv, cast := bv.(map[any]any); cast { - out[k] = d.merge(av, bv) - } else { - out[k] = bv - } - } - } - } - for k, v := range b { - if _, found := a[k]; !found { - out[k] = v - } - } - - return -} - -// AsMap returns self as a map. -func (d *Data) AsMap() (mp map[any]any, isMap bool) { - if d.Any == nil { - return - } - b, err := yaml.Marshal(d.Any) - if err != nil { - return - } - mp = make(map[any]any) - err = yaml.Unmarshal(b, &mp) - if err != nil { - return - } - isMap = true - return -} diff --git a/migration/json/pkg.go b/migration/json/pkg.go deleted file mode 100644 index 74ef4ae6..00000000 --- a/migration/json/pkg.go +++ /dev/null @@ -1,6 +0,0 @@ -package json - -import "encoding/json" - -var Unmarshal = json.Unmarshal -var Marshal = json.Marshal diff --git a/migration/v14/model/core.go b/migration/v14/model/core.go index 06c02bd6..e6498433 100644 --- a/migration/v14/model/core.go +++ b/migration/v14/model/core.go @@ -1,6 +1,7 @@ package model import ( + "encoding/json" "os" "path" "time" @@ -8,7 +9,6 @@ import ( "github.com/google/uuid" liberr "github.com/jortel/go-utils/error" "github.com/konveyor/tackle2-hub/encryption" - "github.com/konveyor/tackle2-hub/migration/json" "gorm.io/gorm" ) @@ -126,7 +126,7 @@ type Task struct { Priority int Policy TaskPolicy `gorm:"type:json;serializer:json"` TTL TTL `gorm:"type:json;serializer:json"` - Data json.Data `gorm:"type:json;serializer:json"` + Data Data `gorm:"type:json;serializer:json"` Started *time.Time Terminated *time.Time Errors []TaskError `gorm:"type:json;serializer:json"` @@ -146,6 +146,40 @@ func (m *Task) BeforeCreate(db *gorm.DB) (err error) { return } +// TaskEvent task event. +type TaskEvent struct { + Kind string `json:"kind"` + Count int `json:"count"` + Reason string `json:"reason,omitempty" yaml:",omitempty"` + Last time.Time `json:"last"` +} + +// Map alias. +type Map = map[string]any + +// Any alias. +type Any any + +// Data json any field. +type Data struct { + Any +} + +// TTL time-to-live. +type TTL struct { + Created int `json:"created,omitempty" yaml:",omitempty"` + Pending int `json:"pending,omitempty" yaml:",omitempty"` + Running int `json:"running,omitempty" yaml:",omitempty"` + Succeeded int `json:"succeeded,omitempty" yaml:",omitempty"` + Failed int `json:"failed,omitempty" yaml:",omitempty"` +} + +// Ref represents a FK. +type Ref struct { + ID uint `json:"id" binding:"required"` + Name string `json:"name,omitempty" yaml:",omitempty"` +} + // TaskError used in Task.Errors. type TaskError struct { Severity string `json:"severity"` @@ -174,7 +208,7 @@ type TaskReport struct { Activity []string `gorm:"type:json;serializer:json"` Errors []TaskError `gorm:"type:json;serializer:json"` Attached []Attachment `gorm:"type:json;serializer:json" ref:"[]file"` - Result json.Data `gorm:"type:json;serializer:json"` + Result Data `gorm:"type:json;serializer:json"` TaskID uint `gorm:"<-:create;uniqueIndex"` Task *Task } @@ -189,7 +223,7 @@ type TaskGroup struct { State string Priority int Policy TaskPolicy `gorm:"type:json;serializer:json"` - Data json.Data `gorm:"type:json;serializer:json"` + Data Data `gorm:"type:json;serializer:json"` List []Task `gorm:"type:json;serializer:json"` Tasks []Task `gorm:"constraint:OnDelete:CASCADE"` } @@ -284,24 +318,3 @@ func (r *Identity) Decrypt() (err error) { } return } - -// -// JSON Fields. -// - -// TaskEvent task event. -type TaskEvent struct { - Kind string `json:"kind"` - Count int `json:"count"` - Reason string `json:"reason,omitempty" yaml:",omitempty"` - Last time.Time `json:"last"` -} - -// TTL time-to-live. -type TTL struct { - Created int `json:"created,omitempty" yaml:",omitempty"` - Pending int `json:"pending,omitempty" yaml:",omitempty"` - Running int `json:"running,omitempty" yaml:",omitempty"` - Succeeded int `json:"succeeded,omitempty" yaml:",omitempty"` - Failed int `json:"failed,omitempty" yaml:",omitempty"` -} diff --git a/model/pkg.go b/model/pkg.go index c9f799c8..6171a917 100644 --- a/model/pkg.go +++ b/model/pkg.go @@ -1,7 +1,6 @@ package model import ( - "github.com/konveyor/tackle2-hub/migration/json" "github.com/konveyor/tackle2-hub/migration/v14/model" ) @@ -48,9 +47,9 @@ type Ticket = model.Ticket type Tracker = model.Tracker type TTL = model.TTL -type Ref = json.Ref -type Map = json.Map -type Data = json.Data +type Ref = model.Ref +type Map = model.Map +type Data = model.Data type TaskError = model.TaskError type TaskEvent = model.TaskEvent diff --git a/settings/hub.go b/settings/hub.go index 192acfec..e903c476 100644 --- a/settings/hub.go +++ b/settings/hub.go @@ -1,9 +1,22 @@ package settings import ( + "context" "os" "strconv" "time" + + liberr "github.com/jortel/go-utils/error" + crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha2" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/kubernetes/scheme" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +const ( + DiscoveryLabel = "konveyor.io/discovery" ) const ( @@ -35,7 +48,6 @@ const ( EnvAnalysisReportPath = "ANALYSIS_REPORT_PATH" EnvAnalysisArchiverEnabled = "ANALYSIS_ARCHIVER_ENABLED" EnvDiscoveryEnabled = "DISCOVERY_ENABLED" - EnvDiscoveryLabel = "DISCOVERY_LABEL" ) type Hub struct { @@ -102,10 +114,9 @@ type Hub struct { ReportPath string ArchiverEnabled bool } - // Discovery settings. Discovery struct { Enabled bool - Label string + Tasks []string } } @@ -265,23 +276,58 @@ func (r *Hub) Load() (err error) { } else { r.Analysis.ArchiverEnabled = true } - s, found = os.LookupEnv(EnvDiscoveryEnabled) - if found { - b, _ := strconv.ParseBool(s) - r.Discovery.Enabled = !r.Disconnected && b - } else { - r.Discovery.Enabled = !r.Disconnected - } - s, found = os.LookupEnv(EnvDiscoveryLabel) - if found { - r.Discovery.Label = s - } else { - r.Discovery.Label = "konveyor.io/discovery" + + if !r.Disconnected { + s, found = os.LookupEnv(EnvDiscoveryEnabled) + if found { + b, _ := strconv.ParseBool(s) + r.Discovery.Enabled = b + } else { + r.Discovery.Enabled = true + } } return } +// FindDiscoveryTasks by their label. +func (r *Hub) FindDiscoveryTasks() (err error) { + if !r.Discovery.Enabled { + return + } + cfg, _ := config.GetConfig() + client, err := k8sclient.New( + cfg, + k8sclient.Options{ + Scheme: scheme.Scheme, + }) + if err != nil { + err = liberr.Wrap(err) + return + } + selector := labels.NewSelector() + req, _ := labels.NewRequirement(DiscoveryLabel, selection.Exists, []string{}) + selector = selector.Add(*req) + options := &k8sclient.ListOptions{ + Namespace: Settings.Namespace, + LabelSelector: selector, + } + list := crd.TaskList{} + err = client.List( + context.TODO(), + &list, + options) + if err != nil { + err = liberr.Wrap(err) + return + } + for i := range list.Items { + t := &list.Items[i] + r.Discovery.Tasks = append(r.Discovery.Tasks, t.Name) + } + return +} + // namespace determines the namespace. func (r *Hub) namespace() (ns string, err error) { ns, found := os.LookupEnv(EnvNamespace) diff --git a/task/error.go b/task/error.go index b6786c86..42b7320f 100644 --- a/task/error.go +++ b/task/error.go @@ -24,6 +24,20 @@ func (e *BadRequest) Is(err error) (matched bool) { return } +// ActionTimeout report an action timeout. +type ActionTimeout struct { +} + +func (e *ActionTimeout) Error() string { + return "Requested (asynchronous) action timed out." +} + +func (e *ActionTimeout) Is(err error) (matched bool) { + var inst *ActionTimeout + matched = errors.As(err, &inst) + return +} + // SoftErr returns true when the error isA SoftError. func SoftErr(err error) (matched, retry bool) { if err == nil { diff --git a/task/manager.go b/task/manager.go index c83c59d9..b67a96ae 100644 --- a/task/manager.go +++ b/task/manager.go @@ -131,10 +131,6 @@ func (m *Manager) Run(ctx context.Context) { // Create a task. func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) { - err = m.findRefs(requested) - if err != nil { - return - } task := &Task{&model.Task{}} switch requested.State { case "": @@ -172,80 +168,77 @@ func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) { // Update update task. func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) { - task := &Task{} - err = db.First(task, requested.ID).Error - if err != nil { - return - } - switch task.State { - case Created, - Ready: - task.UpdateUser = requested.UpdateUser - task.Name = requested.Name - task.Kind = requested.Kind - task.Addon = requested.Addon - task.Extensions = requested.Extensions - task.State = requested.State - task.Locator = requested.Locator - task.Priority = requested.Priority - task.Policy = requested.Policy - task.TTL = requested.TTL - task.Data = requested.Data - task.ApplicationID = requested.ApplicationID - case Pending, - QuotaBlocked, - Postponed: - task.UpdateUser = requested.UpdateUser - task.Name = requested.Name - task.Locator = requested.Locator - task.Data = requested.Data - task.Priority = requested.Priority - task.Policy = requested.Policy - task.TTL = requested.TTL - default: - // discarded. - return - } - err = m.findRefs(task) - if err != nil { - return - } - err = db.Save(task).Error - if err != nil { - err = liberr.Wrap(err) + err = m.action(func() (err error) { + task := &Task{} + err = db.First(task, requested.ID).Error + if err != nil { + return + } + switch task.State { + case Created, + Ready: + task.UpdateUser = requested.UpdateUser + task.Name = requested.Name + task.Kind = requested.Kind + task.Addon = requested.Addon + task.Extensions = requested.Extensions + task.State = requested.State + task.Locator = requested.Locator + task.Priority = requested.Priority + task.Policy = requested.Policy + task.TTL = requested.TTL + task.Data = requested.Data + task.ApplicationID = requested.ApplicationID + case Pending, + QuotaBlocked, + Postponed: + task.UpdateUser = requested.UpdateUser + task.Name = requested.Name + task.Locator = requested.Locator + task.Data = requested.Data + task.Priority = requested.Priority + task.Policy = requested.Policy + task.TTL = requested.TTL + default: + // discarded. + } + err = db.Save(task).Error + if err != nil { + err = liberr.Wrap(err) + return + } return - } + }) return } // Delete a task. func (m *Manager) Delete(db *gorm.DB, id uint) (err error) { - task := &Task{} - err = db.First(task, id).Error - if err != nil { - return - } - m.action( - func() (err error) { - err = task.Delete(m.Client) - if err != nil { - return - } - err = db.Delete(task).Error + err = m.action(func() (err error) { + task := &Task{} + err = db.First(task, id).Error + if err != nil { return - }) + } + err = task.Delete(m.Client) + if err != nil { + return + } + err = db.Delete(task).Error + return + }) return } // Cancel a task. func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) { - task := &Task{} - err = db.First(task, id).Error - if err != nil { - return - } - m.action( + err = m.action( func() (err error) { + task := &Task{} + err = db.First(task, id).Error + if err != nil { + return + } switch task.State { case Succeeded, Failed, @@ -281,22 +274,29 @@ func (m *Manager) pause() { time.Sleep(d) } -// action enqueues an asynchronous action. -func (m *Manager) action(action func() error) { +// action executes an asynchronous action. +func (m *Manager) action(action func() error) (err error) { + d := time.Minute + ch := make(chan error) m.queue <- func() { - var err error defer func() { p := recover() if p != nil { - if pErr, cast := p.(error); cast { - err = pErr + if err, cast := p.(error); cast { + ch <- err } } - if err != nil { - Log.Error(err, "Action failed.") - } + close(ch) }() - err = action() + select { + case ch <- action(): + default: + } + } + select { + case err = <-ch: + case <-time.After(d): + err = &ActionTimeout{} } return } @@ -390,49 +390,6 @@ func (m *Manager) disconnected(list []*Task) (kept []*Task, err error) { return } -// FindRefs find referenced resources. -// - addon -// - extensions -// - kind -// - priority -// The priority is defaulted to the kind as needed. -func (m *Manager) findRefs(task *Task) (err error) { - if Settings.Disconnected { - return - } - if task.Addon != "" { - _, found := m.cluster.addons[task.Addon] - if !found { - err = &AddonNotFound{Name: task.Addon} - return - } - } - for _, name := range task.Extensions { - _, found := m.cluster.extensions[name] - if !found { - err = &ExtensionNotFound{Name: name} - return - } - } - if task.Kind == "" { - return - } - kind, found := m.cluster.tasks[task.Kind] - if !found { - err = &KindNotFound{Name: task.Kind} - return - } - if task.Priority == 0 { - task.Priority = kind.Spec.Priority - } - other := model.Data{Any: kind.Data()} - merged := task.Data.Merge(other) - if !merged { - task.Data = other - } - return -} - // selectAddon selects addon as needed. // The returned list has failed tasks removed. func (m *Manager) selectAddons(list []*Task) (kept []*Task, err error) { diff --git a/test/api/task/api_test.go b/test/api/task/api_test.go index 8fdab840..906568e7 100644 --- a/test/api/task/api_test.go +++ b/test/api/task/api_test.go @@ -2,7 +2,6 @@ package task import ( "testing" - "time" "github.com/konveyor/tackle2-hub/test/assert" ) @@ -74,16 +73,9 @@ func TestTaskCRUD(t *testing.T) { t.Errorf(err.Error()) } - for i := 5; i >= 0; i-- { - time.Sleep(time.Second) - _, err = Task.Get(r.ID) - if err != nil { - break - } - if i == 0 { - t.Errorf("Resource exits, but should be deleted: %v", r) - break - } + _, err = Task.Get(r.ID) + if err == nil { + t.Errorf("Resource exits, but should be deleted: %v", r) } }) } diff --git a/trigger/application.go b/trigger/application.go deleted file mode 100644 index 44823f63..00000000 --- a/trigger/application.go +++ /dev/null @@ -1,42 +0,0 @@ -package trigger - -import ( - "fmt" - - "github.com/konveyor/tackle2-hub/model" - tasking "github.com/konveyor/tackle2-hub/task" -) - -// Application trigger. -type Application struct { - Trigger -} - -// Created trigger. -func (r *Application) Created(m *model.Application) (err error) { - err = r.Updated(m) - return -} - -// Updated trigger. -func (r *Application) Updated(m *model.Application) (err error) { - if !Settings.Discovery.Enabled { - return - } - kinds, err := r.FindTasks(Settings.Discovery.Label) - if err != nil { - return - } - for _, kind := range kinds { - t := &tasking.Task{Task: &model.Task{}} - t.Kind = kind.Name - t.Name = fmt.Sprintf("%s-%s", m.Name, t.Name) - t.ApplicationID = &m.ID - t.State = tasking.Ready - err = r.TaskManager.Create(r.DB, t) - if err != nil { - return - } - } - return -} diff --git a/trigger/identity.go b/trigger/identity.go deleted file mode 100644 index e8416c6a..00000000 --- a/trigger/identity.go +++ /dev/null @@ -1,28 +0,0 @@ -package trigger - -import ( - "github.com/konveyor/tackle2-hub/model" -) - -// Identity trigger. -type Identity struct { - Trigger -} - -// Updated model created trigger. -func (r *Identity) Updated(m *model.Identity) (err error) { - tr := Application{ - Trigger: Trigger{ - TaskManager: r.TaskManager, - Client: r.Client, - DB: r.DB, - }, - } - for i := range m.Applications { - err = tr.Updated(&m.Applications[i]) - if err != nil { - return - } - } - return -} diff --git a/trigger/pkg.go b/trigger/pkg.go deleted file mode 100644 index 20bd344e..00000000 --- a/trigger/pkg.go +++ /dev/null @@ -1,53 +0,0 @@ -package trigger - -import ( - "context" - - liberr "github.com/jortel/go-utils/error" - crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha2" - "github.com/konveyor/tackle2-hub/settings" - tasking "github.com/konveyor/tackle2-hub/task" - "gorm.io/gorm" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" -) - -var ( - Settings = &settings.Settings -) - -// Trigger supports actions triggered by model changes. -type Trigger struct { - TaskManager *tasking.Manager - Client k8sclient.Client - DB *gorm.DB -} - -// FindTasks returns tasks with the specified label. -func (r *Trigger) FindTasks(label string) (matched []*crd.Task, err error) { - selector := labels.NewSelector() - req, _ := labels.NewRequirement( - label, - selection.Exists, - []string{}) - selector = selector.Add(*req) - options := &k8sclient.ListOptions{ - Namespace: Settings.Namespace, - LabelSelector: selector, - } - list := crd.TaskList{} - err = r.Client.List( - context.TODO(), - &list, - options) - if err != nil { - err = liberr.Wrap(err) - return - } - for i := range list.Items { - t := &list.Items[i] - matched = append(matched, t) - } - return -}