diff --git a/models/bots/runner.go b/models/bots/runner.go new file mode 100644 index 0000000000..f09f89d8ea --- /dev/null +++ b/models/bots/runner.go @@ -0,0 +1,108 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "fmt" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/timeutil" + "xorm.io/builder" +) + +// ErrRunnerNotExist represents an error for bot runner not exist +type ErrRunnerNotExist struct { + UUID string +} + +func (err ErrRunnerNotExist) Error() string { + return fmt.Sprintf("Bot runner [%s] is not exist", err.UUID) +} + +// Runner represents runner machines +type Runner struct { + ID int64 + UUID string `xorm:"CHAR(36) UNIQUE"` + Name string `xorm:"VARCHAR(32) UNIQUE"` + OS string `xorm:"VARCHAR(16) index"` // the runner running os + Arch string `xorm:"VARCHAR(16) index"` // the runner running architecture + Type string `xorm:"VARCHAR(16)"` + OwnerID int64 `xorm:"index"` // org level runner, 0 means system + RepoID int64 `xorm:"index"` // repo level runner, if orgid also is zero, then it's a global + Description string `xorm:"TEXT"` + Base int // 0 native 1 docker 2 virtual machine + RepoRange string // glob match which repositories could use this runner + Token string + LastOnline timeutil.TimeStamp `xorm:"index"` + Created timeutil.TimeStamp `xorm:"created"` +} + +func (Runner) TableName() string { + return "actions_runner" +} + +func init() { + db.RegisterModel(&Runner{}) +} + +type GetRunnerOptions struct { + RepoID int64 + OwnerID int64 +} + +func (opts GetRunnerOptions) toCond() builder.Cond { + cond := builder.NewCond() + if opts.RepoID > 0 { + cond = cond.And(builder.Eq{"repo_id": opts.RepoID}) + } + if opts.OwnerID > 0 { + cond = cond.And(builder.Eq{"owner_id": opts.OwnerID}) + } + cond = cond.Or(builder.Eq{"repo_id": 0, "owner_id": 0}) + return cond +} + +// GetUsableRunner returns the usable runner +func GetUsableRunner(opts GetRunnerOptions) (*Runner, error) { + var runner Runner + has, err := db.GetEngine(db.DefaultContext). + Where(opts.toCond()). + Asc("last_online"). + Get(&runner) + if err != nil { + return nil, err + } + if !has { + return nil, ErrRunnerNotExist{} + } + + return &runner, nil +} + +// GetRunnerByUUID returns a bot runner via uuid +func GetRunnerByUUID(uuid string) (*Runner, error) { + var runner Runner + has, err := db.GetEngine(db.DefaultContext).Where("uuid=?", uuid).Get(&runner) + if err != nil { + return nil, err + } else if !has { + return nil, ErrRunnerNotExist{ + UUID: uuid, + } + } + return &runner, nil +} + +// FindRunnersByRepoID returns all workers for the repository +func FindRunnersByRepoID(repoID int64) ([]*Runner, error) { + var runners []*Runner + err := db.GetEngine(db.DefaultContext).Where("repo_id=? OR repo_id=0", repoID). + Find(&runners) + if err != nil { + return nil, err + } + err = db.GetEngine(db.DefaultContext).Join("INNER", "repository", "repository.owner_id = bot_runner.owner_id").Find(&runners) + return runners, err +} diff --git a/models/bots/task.go b/models/bots/task.go new file mode 100644 index 0000000000..65eb9de77b --- /dev/null +++ b/models/bots/task.go @@ -0,0 +1,131 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "errors" + "fmt" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/webhook" + "code.gitea.io/gitea/modules/timeutil" + + "github.com/google/uuid" +) + +// TaskStatus represents a task status +type TaskStatus int + +// enumerate all the statuses of bot task +const ( + TaskPending TaskStatus = iota // wait for assign + TaskAssigned // assigned to a runner + TaskRunning // running + TaskFailed + TaskFinished + TaskCanceled + TaskTimeout +) + +// Task represnets bot tasks +type Task struct { + ID int64 + UUID string `xorm:"CHAR(36)"` + RepoID int64 `xorm:"index"` + TriggerUserID int64 + Ref string + CommitSHA string + Event webhook.HookEventType + Token string // token for this task + Grant string // permissions for this task + EventPayload string `xorm:"LONGTEXT"` + RunnerID int64 `xorm:"index"` + Status TaskStatus `xorm:"index"` + Created timeutil.TimeStamp `xorm:"created"` + StartTime timeutil.TimeStamp + EndTime timeutil.TimeStamp + Updated timeutil.TimeStamp `xorm:"updated"` +} + +// TableName represents a bot task +func (Task) TableName() string { + return "actions_task" +} + +// InsertTask inserts a bot task +func InsertTask(t *Task) error { + if t.UUID == "" { + t.UUID = uuid.New().String() + } + return db.Insert(db.DefaultContext, t) +} + +// UpdateTask updates bot task +func UpdateTask(t *Task, cols ...string) error { + _, err := db.GetEngine(db.DefaultContext).ID(t.ID).Cols(cols...).Update(t) + return err +} + +// ErrTaskNotExist represents an error for bot task not exist +type ErrTaskNotExist struct { + UUID string +} + +func (err ErrTaskNotExist) Error() string { + return fmt.Sprintf("Bot task [%s] is not exist", err.UUID) +} + +// GetTaskByUUID gets bot task by uuid +func GetTaskByUUID(taskUUID string) (*Task, error) { + var task Task + has, err := db.GetEngine(db.DefaultContext).Where("uuid=?", taskUUID).Get(&task) + if err != nil { + return nil, err + } else if !has { + return nil, ErrTaskNotExist{ + UUID: taskUUID, + } + } + return &task, nil +} + +// GetCurTask return the task for the bot +func GetCurTask(runnerID int64) (*Task, error) { + var tasks []Task + // FIXME: for test, just return all tasks + err := db.GetEngine(db.DefaultContext).Where("status=?", TaskPending).Find(&tasks) + // err := x.Where("runner_id = ?", botID). + // And("status=?", BotTaskPending). + // Find(&tasks) + if err != nil { + return nil, err + } + if len(tasks) == 0 { + return nil, nil + } + return &tasks[0], err +} + +// AssignTaskToRunner assign a task to a runner +func AssignTaskToRunner(taskID int64, runnerID int64) error { + cnt, err := db.GetEngine(db.DefaultContext). + Where("runner_id=0"). + And("id=?", taskID). + Cols("runner_id"). + Update(&Task{ + RunnerID: runnerID, + }) + if err != nil { + return err + } + if cnt != 1 { + return errors.New("assign faild") + } + return nil +} + +type TaskStage struct{} + +type StageStep struct{} diff --git a/models/migrations/v216.go b/models/migrations/v216.go new file mode 100644 index 0000000000..114cc6c4c7 --- /dev/null +++ b/models/migrations/v216.go @@ -0,0 +1,52 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package migrations + +import ( + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/xorm" +) + +func addBotTables(x *xorm.Engine) error { + type BotRunner struct { + ID int64 + UUID string `xorm:"CHAR(36) UNIQUE"` + Name string `xorm:"VARCHAR(32) UNIQUE"` + OS string `xorm:"VARCHAR(16) index"` // the runner running os + Arch string `xorm:"VARCHAR(16) index"` // the runner running architecture + Type string `xorm:"VARCHAR(16)"` + OwnerID int64 `xorm:"index"` // org level runner, 0 means system + RepoID int64 `xorm:"index"` // repo level runner, if orgid also is zero, then it's a global + Description string `xorm:"TEXT"` + Base int // 0 native 1 docker 2 virtual machine + RepoRange string // glob match which repositories could use this runner + Token string + LastOnline timeutil.TimeStamp + Created timeutil.TimeStamp `xorm:"created"` + } + + type BotTask struct { + ID int64 + UUID string `xorm:"CHAR(36)"` + RepoID int64 `xorm:"index"` + Type string `xorm:"VARCHAR(16)"` // 0 commit 1 pullrequest + Ref string + CommitSHA string + Event string + Token string // token for this task + Grant string // permissions for this task + EventPayload string `xorm:"LONGTEXT"` + RunnerID int64 `xorm:"index"` + Status int + Content string `xorm:"LONGTEXT"` + Created timeutil.TimeStamp `xorm:"created"` + StartTime timeutil.TimeStamp + EndTime timeutil.TimeStamp + Updated timeutil.TimeStamp `xorm:"updated"` + } + + return x.Sync2(new(BotRunner), new(BotTask)) +} diff --git a/modules/actions/gitea/action.go b/modules/actions/gitea/action.go new file mode 100644 index 0000000000..ef4a4a41dd --- /dev/null +++ b/modules/actions/gitea/action.go @@ -0,0 +1,79 @@ +package gitea + +import ( + "fmt" + "io" + "strings" + + "gopkg.in/yaml.v3" +) + +// ActionRunsUsing is the type of runner for the action +type ActionRunsUsing string + +func (a *ActionRunsUsing) UnmarshalYAML(unmarshal func(interface{}) error) error { + var using string + if err := unmarshal(&using); err != nil { + return err + } + + // Force input to lowercase for case insensitive comparison + format := ActionRunsUsing(strings.ToLower(using)) + switch format { + case ActionRunsUsingNode12, ActionRunsUsingDocker: + *a = format + default: + return fmt.Errorf(fmt.Sprintf("The runs.using key in action.yml must be one of: %v, got %s", []string{ + ActionRunsUsingDocker, + ActionRunsUsingNode12, + }, format)) + } + return nil +} + +const ( + // ActionRunsUsingNode12 for running with node12 + ActionRunsUsingNode12 = "node12" + // ActionRunsUsingDocker for running with docker + ActionRunsUsingDocker = "docker" +) + +// Action describes a metadata file for GitHub actions. The metadata filename must be either action.yml or action.yaml. The data in the metadata file defines the inputs, outputs and main entrypoint for your action. +type Action struct { + Name string `yaml:"name"` + Author string `yaml:"author"` + Description string `yaml:"description"` + Inputs map[string]Input `yaml:"inputs"` + Outputs map[string]Output `yaml:"outputs"` + Runs struct { + Using ActionRunsUsing `yaml:"using"` + Env map[string]string `yaml:"env"` + Main string `yaml:"main"` + Image string `yaml:"image"` + Entrypoint []string `yaml:"entrypoint"` + Args []string `yaml:"args"` + } `yaml:"runs"` + Branding struct { + Color string `yaml:"color"` + Icon string `yaml:"icon"` + } `yaml:"branding"` +} + +// Input parameters allow you to specify data that the action expects to use during runtime. GitHub stores input parameters as environment variables. Input ids with uppercase letters are converted to lowercase during runtime. We recommended using lowercase input ids. +type Input struct { + Description string `yaml:"description"` + Required bool `yaml:"required"` + Default string `yaml:"default"` +} + +// Output parameters allow you to declare data that an action sets. Actions that run later in a workflow can use the output data set in previously run actions. For example, if you had an action that performed the addition of two inputs (x + y = z), the action could output the sum (z) for other actions to use as an input. +type Output struct { + Description string `yaml:"description"` +} + +// ReadAction reads an action from a reader +func ReadAction(in io.Reader) (*Action, error) { + a := new(Action) + err := yaml.NewDecoder(in).Decode(a) + return a, err +} diff --git a/modules/actions/gitea/gitea.go b/modules/actions/gitea/gitea.go new file mode 100644 index 0000000000..8f7c0835e5 --- /dev/null +++ b/modules/actions/gitea/gitea.go @@ -0,0 +1,60 @@ +package gitea + +import ( + "code.gitea.io/gitea/models/webhook" + "code.gitea.io/gitea/modules/bot/runner" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/modules/json" +) + +func init() { + runner.RegisterRunnerType(new(GiteaRunner)) +} + +type GiteaRunner struct { +} + +func (gw *GiteaRunner) Name() string { + return "gitea" +} + +func (gw *GiteaRunner) Detect(commit *git.Commit, event webhook.HookEventType, ref string) (bool, string, error) { + tree, err := commit.SubTree(".gitea/workflow") + if err != nil { + return false, "", err + } + entries, err := tree.ListEntries() + if err != nil { + return false, "", err + } + + var wfs []*Workflow + for _, entry := range entries { + blob := entry.Blob() + rd, err := blob.DataAsync() + if err != nil { + return false, "", err + } + defer rd.Close() + wf, err := ReadWorkflow(rd) + if err != nil { + log.Error("ReadWorkflow file %s failed: %v", entry.Name(), err) + continue + } + + // FIXME: we have to convert the event type to github known name + if !util.IsStringInSlice(string(event), wf.On()) { + continue + } + + wfs = append(wfs, wf) + } + + wfBs, err := json.Marshal(wfs) + if err != nil { + return false, "", err + } + return true, string(wfBs), nil +} diff --git a/modules/actions/gitea/planner.go b/modules/actions/gitea/planner.go new file mode 100644 index 0000000000..6d80e79d49 --- /dev/null +++ b/modules/actions/gitea/planner.go @@ -0,0 +1,265 @@ +package gitea + +import ( + "io" + "io/ioutil" + "math" + "os" + "path/filepath" + "sort" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +// WorkflowPlanner contains methods for creating plans +type WorkflowPlanner interface { + PlanEvent(eventName string) *Plan + PlanJob(jobName string) *Plan + GetEvents() []string +} + +// Plan contains a list of stages to run in series +type Plan struct { + Stages []*Stage +} + +// Stage contains a list of runs to execute in parallel +type Stage struct { + Runs []*Run +} + +// Run represents a job from a workflow that needs to be run +type Run struct { + Workflow *Workflow + JobID string +} + +func (r *Run) String() string { + jobName := r.Job().Name + if jobName == "" { + jobName = r.JobID + } + return jobName +} + +// Job returns the job for this Run +func (r *Run) Job() *Job { + return r.Workflow.GetJob(r.JobID) +} + +// NewWorkflowPlanner will load a specific workflow or all workflows from a directory +func NewWorkflowPlanner(path string) (WorkflowPlanner, error) { + fi, err := os.Stat(path) + if err != nil { + return nil, err + } + + var files []os.FileInfo + var dirname string + + if fi.IsDir() { + log.Debugf("Loading workflows from '%s'", path) + dirname = path + files, err = ioutil.ReadDir(path) + } else { + log.Debugf("Loading workflow '%s'", path) + dirname, err = filepath.Abs(filepath.Dir(path)) + files = []os.FileInfo{fi} + } + if err != nil { + return nil, err + } + + wp := new(workflowPlanner) + for _, file := range files { + ext := filepath.Ext(file.Name()) + if ext == ".yml" || ext == ".yaml" { + f, err := os.Open(filepath.Join(dirname, file.Name())) + if err != nil { + return nil, err + } + + log.Debugf("Reading workflow '%s'", f.Name()) + workflow, err := ReadWorkflow(f) + if err != nil { + f.Close() + if err == io.EOF { + return nil, errors.WithMessagef(err, "unable to read workflow, %s file is empty", file.Name()) + } + return nil, err + } + if workflow.Name == "" { + workflow.Name = file.Name() + } + wp.workflows = append(wp.workflows, workflow) + f.Close() + } + } + + return wp, nil +} + +type workflowPlanner struct { + workflows []*Workflow +} + +// PlanEvent builds a new list of runs to execute in parallel for an event name +func (wp *workflowPlanner) PlanEvent(eventName string) *Plan { + plan := new(Plan) + if len(wp.workflows) == 0 { + log.Debugf("no events found for workflow: %s", eventName) + } + + for _, w := range wp.workflows { + for _, e := range w.When().Events { + if e.Type == eventName { + plan.mergeStages(createStages(w, w.GetJobIDs()...)) + } + } + } + return plan +} + +// PlanJob builds a new run to execute in parallel for a job name +func (wp *workflowPlanner) PlanJob(jobName string) *Plan { + plan := new(Plan) + if len(wp.workflows) == 0 { + log.Debugf("no jobs found for workflow: %s", jobName) + } + + for _, w := range wp.workflows { + plan.mergeStages(createStages(w, jobName)) + } + return plan +} + +// GetEvents gets all the events in the workflows file +func (wp *workflowPlanner) GetEvents() []string { + events := make([]string, 0) + for _, w := range wp.workflows { + found := false + for _, e := range events { + for _, we := range w.When().Events { + if e == we.Type { + found = true + break + } + } + if found { + break + } + } + + if !found { + for _, evt := range w.When().Events { + events = append(events, evt.Type) + } + } + } + + // sort the list based on depth of dependencies + sort.Slice(events, func(i, j int) bool { + return events[i] < events[j] + }) + + return events +} + +// MaxRunNameLen determines the max name length of all jobs +func (p *Plan) MaxRunNameLen() int { + maxRunNameLen := 0 + for _, stage := range p.Stages { + for _, run := range stage.Runs { + runNameLen := len(run.String()) + if runNameLen > maxRunNameLen { + maxRunNameLen = runNameLen + } + } + } + return maxRunNameLen +} + +// GetJobIDs will get all the job names in the stage +func (s *Stage) GetJobIDs() []string { + names := make([]string, 0) + for _, r := range s.Runs { + names = append(names, r.JobID) + } + return names +} + +// Merge stages with existing stages in plan +func (p *Plan) mergeStages(stages []*Stage) { + newStages := make([]*Stage, int(math.Max(float64(len(p.Stages)), float64(len(stages))))) + for i := 0; i < len(newStages); i++ { + newStages[i] = new(Stage) + if i >= len(p.Stages) { + newStages[i].Runs = append(newStages[i].Runs, stages[i].Runs...) + } else if i >= len(stages) { + newStages[i].Runs = append(newStages[i].Runs, p.Stages[i].Runs...) + } else { + newStages[i].Runs = append(newStages[i].Runs, p.Stages[i].Runs...) + newStages[i].Runs = append(newStages[i].Runs, stages[i].Runs...) + } + } + p.Stages = newStages +} + +func createStages(w *Workflow, jobIDs ...string) []*Stage { + // first, build a list of all the necessary jobs to run, and their dependencies + jobDependencies := make(map[string][]string) + for len(jobIDs) > 0 { + newJobIDs := make([]string, 0) + for _, jID := range jobIDs { + // make sure we haven't visited this job yet + if _, ok := jobDependencies[jID]; !ok { + if job := w.GetJob(jID); job != nil { + jobDependencies[jID] = job.Needs() + newJobIDs = append(newJobIDs, job.Needs()...) + } + } + } + jobIDs = newJobIDs + } + + // next, build an execution graph + stages := make([]*Stage, 0) + for len(jobDependencies) > 0 { + stage := new(Stage) + for jID, jDeps := range jobDependencies { + // make sure all deps are in the graph already + if listInStages(jDeps, stages...) { + stage.Runs = append(stage.Runs, &Run{ + Workflow: w, + JobID: jID, + }) + delete(jobDependencies, jID) + } + } + if len(stage.Runs) == 0 { + log.Fatalf("Unable to build dependency graph!") + } + stages = append(stages, stage) + } + + return stages +} + +// return true iff all strings in srcList exist in at least one of the stages +func listInStages(srcList []string, stages ...*Stage) bool { + for _, src := range srcList { + found := false + for _, stage := range stages { + for _, search := range stage.GetJobIDs() { + if src == search { + found = true + } + } + } + if !found { + return false + } + } + return true +} diff --git a/modules/actions/gitea/workflow.go b/modules/actions/gitea/workflow.go new file mode 100644 index 0000000000..2a8a5d04f0 --- /dev/null +++ b/modules/actions/gitea/workflow.go @@ -0,0 +1,377 @@ +package gitea + +import ( + "fmt" + "io" + "reflect" + "regexp" + "strings" + + log "github.com/sirupsen/logrus" + "gopkg.in/yaml.v3" +) + +// Workflow is the structure of the files in .github/workflows +type Workflow struct { + Name string `yaml:"name"` + RawWhen yaml.Node `yaml:"when"` + Env map[string]string `yaml:"env"` + Jobs map[string]*Job `yaml:"jobs"` + Defaults Defaults `yaml:"defaults"` +} + +type Event struct { + Type string + Ref string +} + +type When struct { + Events []Event +} + +func (w *When) Match(tp string) bool { + for _, evt := range w.Events { + if strings.EqualFold(tp, evt.Type) { + return true + } + } + return false +} + +// When events for the workflow +func (w *Workflow) When() *When { + switch w.RawWhen.Kind { + case yaml.ScalarNode: + var val string + err := w.RawWhen.Decode(&val) + if err != nil { + log.Fatal(err) + } + return &When{ + Events: []Event{ + { + Type: val, + }, + }, + } + case yaml.SequenceNode: + var vals []string + err := w.RawWhen.Decode(&vals) + if err != nil { + log.Fatal(err) + } + var when When + for _, val := range vals { + when.Events = append(when.Events, Event{ + Type: val, + }) + } + return &when + case yaml.MappingNode: + var val map[string]interface{} + err := w.RawWhen.Decode(&val) + if err != nil { + log.Fatal(err) + } + var keys []string + for k := range val { + keys = append(keys, k) + } + var when When + for _, val := range keys { + when.Events = append(when.Events, Event{ + Type: val, + }) + } + return &when + } + return nil +} + +// Job is the structure of one job in a workflow +type Job struct { + Name string `yaml:"name"` + RawNeeds yaml.Node `yaml:"needs"` + RawRunsOn yaml.Node `yaml:"runs-on"` + Env map[string]string `yaml:"env"` + If string `yaml:"if"` + Steps []*Step `yaml:"steps"` + TimeoutMinutes int64 `yaml:"timeout-minutes"` + Services map[string]*ContainerSpec `yaml:"services"` + Strategy *Strategy `yaml:"strategy"` + RawContainer yaml.Node `yaml:"container"` + Defaults Defaults `yaml:"defaults"` +} + +// Strategy for the job +type Strategy struct { + FailFast bool `yaml:"fail-fast"` + MaxParallel int `yaml:"max-parallel"` + Matrix map[string][]interface{} `yaml:"matrix"` +} + +// Default settings that will apply to all steps in the job or workflow +type Defaults struct { + Run RunDefaults `yaml:"run"` +} + +// Defaults for all run steps in the job or workflow +type RunDefaults struct { + Shell string `yaml:"shell"` + WorkingDirectory string `yaml:"working-directory"` +} + +// Container details for the job +func (j *Job) Container() *ContainerSpec { + var val *ContainerSpec + switch j.RawContainer.Kind { + case yaml.ScalarNode: + val = new(ContainerSpec) + err := j.RawContainer.Decode(&val.Image) + if err != nil { + log.Fatal(err) + } + case yaml.MappingNode: + val = new(ContainerSpec) + err := j.RawContainer.Decode(val) + if err != nil { + log.Fatal(err) + } + } + return val +} + +// Needs list for Job +func (j *Job) Needs() []string { + + switch j.RawNeeds.Kind { + case yaml.ScalarNode: + var val string + err := j.RawNeeds.Decode(&val) + if err != nil { + log.Fatal(err) + } + return []string{val} + case yaml.SequenceNode: + var val []string + err := j.RawNeeds.Decode(&val) + if err != nil { + log.Fatal(err) + } + return val + } + return nil +} + +// RunsOn list for Job +func (j *Job) RunsOn() []string { + + switch j.RawRunsOn.Kind { + case yaml.ScalarNode: + var val string + err := j.RawRunsOn.Decode(&val) + if err != nil { + log.Fatal(err) + } + return []string{val} + case yaml.SequenceNode: + var val []string + err := j.RawRunsOn.Decode(&val) + if err != nil { + log.Fatal(err) + } + return val + } + return nil +} + +// GetMatrixes returns the matrix cross product +func (j *Job) GetMatrixes() []map[string]interface{} { + matrixes := make([]map[string]interface{}, 0) + /*if j.Strategy != nil { + includes := make([]map[string]interface{}, 0) + for _, v := range j.Strategy.Matrix["include"] { + includes = append(includes, v.(map[string]interface{})) + } + delete(j.Strategy.Matrix, "include") + + excludes := make([]map[string]interface{}, 0) + for _, v := range j.Strategy.Matrix["exclude"] { + excludes = append(excludes, v.(map[string]interface{})) + } + delete(j.Strategy.Matrix, "exclude") + + matrixProduct := common.CartesianProduct(j.Strategy.Matrix) + + MATRIX: + for _, matrix := range matrixProduct { + for _, exclude := range excludes { + if commonKeysMatch(matrix, exclude) { + log.Debugf("Skipping matrix '%v' due to exclude '%v'", matrix, exclude) + continue MATRIX + } + } + matrixes = append(matrixes, matrix) + } + for _, include := range includes { + log.Debugf("Adding include '%v'", include) + matrixes = append(matrixes, include) + } + + } else { + matrixes = append(matrixes, make(map[string]interface{})) + }*/ + return matrixes +} + +func commonKeysMatch(a map[string]interface{}, b map[string]interface{}) bool { + for aKey, aVal := range a { + if bVal, ok := b[aKey]; ok && !reflect.DeepEqual(aVal, bVal) { + return false + } + } + return true +} + +// ContainerSpec is the specification of the container to use for the job +type ContainerSpec struct { + Image string `yaml:"image"` + Env map[string]string `yaml:"env"` + Ports []string `yaml:"ports"` + Volumes []string `yaml:"volumes"` + Options string `yaml:"options"` + Entrypoint string + Args string + Name string + Reuse bool +} + +// Step is the structure of one step in a job +type Step struct { + ID string `yaml:"id"` + If string `yaml:"if"` + Name string `yaml:"name"` + Uses string `yaml:"uses"` + Run string `yaml:"run"` + WorkingDirectory string `yaml:"working-directory"` + Shell string `yaml:"shell"` + Env map[string]string `yaml:"env"` + With map[string]string `yaml:"with"` + ContinueOnError bool `yaml:"continue-on-error"` + TimeoutMinutes int64 `yaml:"timeout-minutes"` +} + +// String gets the name of step +func (s *Step) String() string { + if s.Name != "" { + return s.Name + } else if s.Uses != "" { + return s.Uses + } else if s.Run != "" { + return s.Run + } + return s.ID +} + +// GetEnv gets the env for a step +func (s *Step) GetEnv() map[string]string { + rtnEnv := make(map[string]string) + for k, v := range s.Env { + rtnEnv[k] = v + } + for k, v := range s.With { + envKey := regexp.MustCompile("[^A-Z0-9-]").ReplaceAllString(strings.ToUpper(k), "_") + envKey = fmt.Sprintf("INPUT_%s", strings.ToUpper(envKey)) + rtnEnv[envKey] = v + } + return rtnEnv +} + +// ShellCommand returns the command for the shell +func (s *Step) ShellCommand() string { + shellCommand := "" + + switch s.Shell { + case "", "bash": + shellCommand = "bash --noprofile --norc -eo pipefail {0}" + case "pwsh": + shellCommand = "pwsh -command \"& '{0}'\"" + case "python": + shellCommand = "python {0}" + case "sh": + shellCommand = "sh -e -c {0}" + case "cmd": + shellCommand = "%ComSpec% /D /E:ON /V:OFF /S /C \"CALL \"{0}\"\"" + case "powershell": + shellCommand = "powershell -command \"& '{0}'\"" + default: + shellCommand = s.Shell + } + return shellCommand +} + +// StepType describes what type of step we are about to run +type StepType int + +const ( + // StepTypeRun is all steps that have a `run` attribute + StepTypeRun StepType = iota + + //StepTypeUsesDockerURL is all steps that have a `uses` that is of the form `docker://...` + StepTypeUsesDockerURL + + //StepTypeUsesActionLocal is all steps that have a `uses` that is a local action in a subdirectory + StepTypeUsesActionLocal + + //StepTypeUsesActionRemote is all steps that have a `uses` that is a reference to a github repo + StepTypeUsesActionRemote +) + +// Type returns the type of the step +func (s *Step) Type() StepType { + if s.Run != "" { + return StepTypeRun + } else if strings.HasPrefix(s.Uses, "docker://") { + return StepTypeUsesDockerURL + } else if strings.HasPrefix(s.Uses, "./") { + return StepTypeUsesActionLocal + } + return StepTypeUsesActionRemote +} + +// ReadWorkflow returns a list of jobs for a given workflow file reader +func ReadWorkflow(in io.Reader) (*Workflow, error) { + w := new(Workflow) + err := yaml.NewDecoder(in).Decode(w) + return w, err +} + +// GetJob will get a job by name in the workflow +func (w *Workflow) GetJob(jobID string) *Job { + for id, j := range w.Jobs { + if jobID == id { + if j.Name == "" { + j.Name = id + } + return j + } + } + return nil +} + +// GetJobIDs will get all the job names in the workflow +func (w *Workflow) GetJobIDs() []string { + ids := make([]string, 0) + for id := range w.Jobs { + ids = append(ids, id) + } + return ids +} + +func (w *Workflow) On() []string { + var evts []string + for _, job := range w.Jobs { + evts = append(evts, job.RunsOn()...) + } + return evts +} \ No newline at end of file diff --git a/modules/actions/gitea/workflow_test.go b/modules/actions/gitea/workflow_test.go new file mode 100644 index 0000000000..6df4a15e75 --- /dev/null +++ b/modules/actions/gitea/workflow_test.go @@ -0,0 +1,100 @@ +package gitea + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadWorkflow_StringEvent(t *testing.T) { + yaml := ` +name: local-action-docker-url +on: push + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: ./actions/docker-url +` + + workflow, err := ReadWorkflow(strings.NewReader(yaml)) + assert.NoError(t, err, "read workflow should succeed") + + assert.Len(t, workflow.On(), 1) + assert.Contains(t, workflow.On(), "push") +} + +func TestReadWorkflow_ListEvent(t *testing.T) { + yaml := ` +name: local-action-docker-url +on: [push, pull_request] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: ./actions/docker-url +` + + workflow, err := ReadWorkflow(strings.NewReader(yaml)) + assert.NoError(t, err, "read workflow should succeed") + + assert.Len(t, workflow.On(), 2) + assert.Contains(t, workflow.On(), "push") + assert.Contains(t, workflow.On(), "pull_request") +} + +func TestReadWorkflow_MapEvent(t *testing.T) { + yaml := ` +name: local-action-docker-url +on: + push: + branches: + - master + pull_request: + branches: + - master + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: ./actions/docker-url +` + + workflow, err := ReadWorkflow(strings.NewReader(yaml)) + assert.NoError(t, err, "read workflow should succeed") + assert.Len(t, workflow.On(), 2) + assert.Contains(t, workflow.On(), "push") + assert.Contains(t, workflow.On(), "pull_request") +} + +func TestReadWorkflow_StringContainer(t *testing.T) { + yaml := ` +name: local-action-docker-url + +jobs: + test: + container: nginx:latest + runs-on: ubuntu-latest + steps: + - uses: ./actions/docker-url + test2: + container: + image: nginx:latest + env: + foo: bar + runs-on: ubuntu-latest + steps: + - uses: ./actions/docker-url +` + + workflow, err := ReadWorkflow(strings.NewReader(yaml)) + assert.NoError(t, err, "read workflow should succeed") + assert.Len(t, workflow.Jobs, 2) + assert.Contains(t, workflow.Jobs["test"].Container().Image, "nginx:latest") + assert.Contains(t, workflow.Jobs["test2"].Container().Image, "nginx:latest") + assert.Contains(t, workflow.Jobs["test2"].Container().Env["foo"], "bar") +} diff --git a/modules/actions/github/github.go b/modules/actions/github/github.go new file mode 100644 index 0000000000..b39539fa0d --- /dev/null +++ b/modules/actions/github/github.go @@ -0,0 +1,165 @@ +package github + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + + bot_model "code.gitea.io/gitea/models/bot" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/models/webhook" + "code.gitea.io/gitea/modules/bot/runner" + "code.gitea.io/gitea/modules/git" + + //"code.gitea.io/gitea/modules/log" + //"code.gitea.io/gitea/modules/util" + + "github.com/nektos/act/pkg/model" + act_runner "github.com/nektos/act/pkg/runner" +) + +func init() { + runner.RegisterRunnerType(new(GithubRunner)) +} + +type GithubRunner struct { +} + +func (gw *GithubRunner) Name() string { + return "github" +} + +func (gw *GithubRunner) Detect(commit *git.Commit, event webhook.HookEventType, ref string) (bool, string, error) { + tree, err := commit.SubTree(".github/workflow") + if err != nil { + return false, "", err + } + entries, err := tree.ListEntries() + if err != nil { + return false, "", err + } + + var content = make(map[string]string) + for _, entry := range entries { + blob := entry.Blob() + rd, err := blob.DataAsync() + if err != nil { + return false, "", err + } + + bs, err := io.ReadAll(rd) + rd.Close() + if err != nil { + return false, "", err + } + content[entry.Name()] = string(bs) + } + + res, err := json.Marshal(content) + if err != nil { + return false, "", err + } + return true, string(res), nil +} + +func (gw *GithubRunner) Run(task *bot_model.Task) error { + tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%d", task.ID)) + if err != nil { + return err + } + + var files = make(map[string]string) + if err := json.Unmarshal([]byte(task.Content), &files); err != nil { + return err + } + for name, content := range files { + f, err := os.Create(filepath.Join(tmpDir, name)) + if err != nil { + return err + } + if _, err := f.WriteString(content); err != nil { + f.Close() + return err + } + f.Close() + } + + repo, err := repo_model.GetRepositoryByID(task.RepoID) + if err != nil { + return err + } + + evtFilePath := filepath.Join(tmpDir, "event.json") + evtFile, err := os.Create(evtFilePath) + if err != nil { + return err + } + + if _, err := evtFile.WriteString(task.EventPayload); err != nil { + evtFile.Close() + return err + } + evtFile.Close() + + planner, err := model.NewWorkflowPlanner(tmpDir, false) + if err != nil { + return err + } + plan := planner.PlanEvent(task.Event) + + actor, err := user_model.GetUserByID(task.TriggerUserID) + if err != nil { + return err + } + + // run the plan + config := &act_runner.Config{ + Actor: actor.LoginName, + EventName: task.Event, + EventPath: evtFilePath, + DefaultBranch: repo.DefaultBranch, + /*ForcePull: input.forcePull, + ForceRebuild: input.forceRebuild, + ReuseContainers: input.reuseContainers, + Workdir: input.Workdir(), + BindWorkdir: input.bindWorkdir, + LogOutput: !input.noOutput,*/ + //Env: envs, + Secrets: map[string]string{ + "token": "614e597274a527b6fcf6ddfe45def79430126f08", + }, + //InsecureSecrets: input.insecureSecrets,*/ + Platforms: map[string]string{ + "ubuntu-latest": "node:12-buster-slim", + "ubuntu-20.04": "node:12-buster-slim", + "ubuntu-18.04": "node:12-buster-slim", + }, + /*Privileged: input.privileged, + UsernsMode: input.usernsMode, + ContainerArchitecture: input.containerArchitecture, + ContainerDaemonSocket: input.containerDaemonSocket, + UseGitIgnore: input.useGitIgnore,*/ + GitHubInstance: "gitea.com", + /*ContainerCapAdd: input.containerCapAdd, + ContainerCapDrop: input.containerCapDrop, + AutoRemove: input.autoRemove, + ArtifactServerPath: input.artifactServerPath, + ArtifactServerPort: input.artifactServerPort,*/ + } + r, err := act_runner.New(config) + if err != nil { + return err + } + + //ctx, cancel := context.WithTimeout(context.Background(), ) + + executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error { + //cancel() + return nil + }) + return executor(context.Background()) +} diff --git a/modules/actions/runner/runner.go b/modules/actions/runner/runner.go new file mode 100644 index 0000000000..2a9540ad76 --- /dev/null +++ b/modules/actions/runner/runner.go @@ -0,0 +1,27 @@ +package runner + +import ( + bots_model "code.gitea.io/gitea/models/bots" + "code.gitea.io/gitea/models/webhook" + "code.gitea.io/gitea/modules/git" +) + +var runnerTypes = make(map[string]RunnerType) + +type RunnerType interface { + Name() string + Detect(commit *git.Commit, event webhook.HookEventType, ref string) (bool, string, error) + Run(task *bots_model.Task) error +} + +func RegisterRunnerType(runnerType RunnerType) { + runnerTypes[runnerType.Name()] = runnerType +} + +func GetRunnerType(name string) RunnerType { + return runnerTypes[name] +} + +func GetRunnerTypes() map[string]RunnerType { + return runnerTypes +} diff --git a/modules/context/response.go b/modules/context/response.go index 112964dbe1..24844baa08 100644 --- a/modules/context/response.go +++ b/modules/context/response.go @@ -5,6 +5,9 @@ package context import ( + "bufio" + "errors" + "net" "net/http" ) @@ -84,6 +87,14 @@ func (r *Response) Before(f func(ResponseWriter)) { r.befores = append(r.befores, f) } +func (r *Response) Hijack() (net.Conn, *bufio.ReadWriter, error) { + if h, ok := r.ResponseWriter.(http.Hijacker); ok { + return h.Hijack() + } + + return nil, nil, errors.New("unimplemented http.Hijacker ") +} + // NewResponse creates a response func NewResponse(resp http.ResponseWriter) *Response { if v, ok := resp.(*Response); ok { diff --git a/modules/notification/bots/bots.go b/modules/notification/bots/bots.go new file mode 100644 index 0000000000..8fc31ed3cf --- /dev/null +++ b/modules/notification/bots/bots.go @@ -0,0 +1,209 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "context" + "encoding/json" + "fmt" + + "code.gitea.io/gitea/models" + bots_model "code.gitea.io/gitea/models/bots" + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/models/perm" + repo_model "code.gitea.io/gitea/models/repo" + user_model "code.gitea.io/gitea/models/user" + "code.gitea.io/gitea/models/webhook" + "code.gitea.io/gitea/modules/convert" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/notification/base" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/repository" + "code.gitea.io/gitea/modules/setting" + api "code.gitea.io/gitea/modules/structs" + bots_service "code.gitea.io/gitea/services/bots" +) + +type botsNotifier struct { + base.NullNotifier +} + +var _ base.Notifier = &botsNotifier{} + +// NewNotifier create a new botsNotifier notifier +func NewNotifier() base.Notifier { + return &botsNotifier{} +} + +func notifyIssue(issue *models.Issue, doer *user_model.User, evt webhook.HookEventType, payload string) { + err := issue.LoadRepo(db.DefaultContext) + if err != nil { + log.Error("issue.LoadRepo: %v", err) + return + } + if issue.Repo.IsEmpty || issue.Repo.IsArchived { + return + } + + ref := issue.Ref + if ref == "" { + ref = issue.Repo.DefaultBranch + } + + gitRepo, err := git.OpenRepository(context.Background(), issue.Repo.RepoPath()) + if err != nil { + log.Error("issue.LoadRepo: %v", err) + return + } + defer gitRepo.Close() + + // Get the commit object for the ref + commit, err := gitRepo.GetCommit(ref) + if err != nil { + log.Error("gitRepo.GetCommit: %v", err) + return + } + + task := bots_model.Task{ + RepoID: issue.RepoID, + TriggerUserID: doer.ID, + Event: evt, + EventPayload: payload, + Status: bots_model.TaskPending, + Ref: ref, + CommitSHA: commit.ID.String(), + } + + if err := bots_model.InsertTask(&task); err != nil { + log.Error("InsertBotTask: %v", err) + } else { + bots_service.PushToQueue(&task) + } +} + +// TODO: implement all events +func (a *botsNotifier) NotifyNewIssue(issue *models.Issue, mentions []*user_model.User) { + payload := map[string]interface{}{ + "issue": map[string]interface{}{ + "number": issue.Index, + }, + } + bs, err := json.Marshal(payload) + if err != nil { + log.Error("NotifyNewIssue: %v", err) + return + } + notifyIssue(issue, issue.Poster, webhook.HookEventIssues, string(bs)) +} + +// NotifyIssueChangeStatus notifies close or reopen issue to notifiers +func (a *botsNotifier) NotifyIssueChangeStatus(doer *user_model.User, issue *models.Issue, actionComment *models.Comment, closeOrReopen bool) { +} + +func (a *botsNotifier) NotifyIssueChangeLabels(doer *user_model.User, issue *models.Issue, + addedLabels []*models.Label, removedLabels []*models.Label, +) { + payload := map[string]interface{}{ + "issue": map[string]interface{}{ + "number": issue.Index, + }, + } + bs, err := json.Marshal(payload) + if err != nil { + log.Error("NotifyNewIssue: %v", err) + return + } + notifyIssue(issue, doer, webhook.HookEventIssueLabel, string(bs)) +} + +// NotifyCreateIssueComment notifies comment on an issue to notifiers +func (a *botsNotifier) NotifyCreateIssueComment(doer *user_model.User, repo *repo_model.Repository, + issue *models.Issue, comment *models.Comment, mentions []*user_model.User) { +} + +func (a *botsNotifier) NotifyNewPullRequest(pull *models.PullRequest, mentions []*user_model.User) { +} + +func (a *botsNotifier) NotifyRenameRepository(doer *user_model.User, repo *repo_model.Repository, oldRepoName string) { +} + +func (a *botsNotifier) NotifyTransferRepository(doer *user_model.User, repo *repo_model.Repository, oldOwnerName string) { +} + +func (a *botsNotifier) NotifyCreateRepository(doer *user_model.User, u *user_model.User, repo *repo_model.Repository) { +} + +func (a *botsNotifier) NotifyForkRepository(doer *user_model.User, oldRepo, repo *repo_model.Repository) { +} + +func (a *botsNotifier) NotifyPullRequestReview(pr *models.PullRequest, review *models.Review, comment *models.Comment, mentions []*user_model.User) { +} + +func (*botsNotifier) NotifyMergePullRequest(pr *models.PullRequest, doer *user_model.User) { +} + +func (a *botsNotifier) NotifyPushCommits(pusher *user_model.User, repo *repo_model.Repository, opts *repository.PushUpdateOptions, commits *repository.PushCommits) { + ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("webhook.NotifyPushCommits User: %s[%d] in %s[%d]", pusher.Name, pusher.ID, repo.FullName(), repo.ID)) + defer finished() + + apiPusher := convert.ToUser(pusher, nil) + apiCommits, apiHeadCommit, err := commits.ToAPIPayloadCommits(ctx, repo.RepoPath(), repo.HTMLURL()) + if err != nil { + log.Error("commits.ToAPIPayloadCommits failed: %v", err) + return + } + + payload := &api.PushPayload{ + Ref: opts.RefFullName, + Before: opts.OldCommitID, + After: opts.NewCommitID, + CompareURL: setting.AppURL + commits.CompareURL, + Commits: apiCommits, + HeadCommit: apiHeadCommit, + Repo: convert.ToRepo(repo, perm.AccessModeOwner), + Pusher: apiPusher, + Sender: apiPusher, + } + + bs, err := json.Marshal(payload) + if err != nil { + log.Error("json.Marshal(payload) failed: %v", err) + return + } + + task := bots_model.Task{ + RepoID: repo.ID, + TriggerUserID: pusher.ID, + Event: webhook.HookEventPush, + EventPayload: string(bs), + Status: bots_model.TaskPending, + } + + if err := bots_model.InsertTask(&task); err != nil { + log.Error("InsertBotTask: %v", err) + } else { + bots_service.PushToQueue(&task) + } +} + +func (a *botsNotifier) NotifyCreateRef(doer *user_model.User, repo *repo_model.Repository, refType, refFullName, refID string) { +} + +func (a *botsNotifier) NotifyDeleteRef(doer *user_model.User, repo *repo_model.Repository, refType, refFullName string) { +} + +func (a *botsNotifier) NotifySyncPushCommits(pusher *user_model.User, repo *repo_model.Repository, opts *repository.PushUpdateOptions, commits *repository.PushCommits) { +} + +func (a *botsNotifier) NotifySyncCreateRef(doer *user_model.User, repo *repo_model.Repository, refType, refFullName, refID string) { +} + +func (a *botsNotifier) NotifySyncDeleteRef(doer *user_model.User, repo *repo_model.Repository, refType, refFullName string) { +} + +func (a *botsNotifier) NotifyNewRelease(rel *models.Release) { +} diff --git a/modules/notification/notification.go b/modules/notification/notification.go index a117a60815..e122dcf604 100644 --- a/modules/notification/notification.go +++ b/modules/notification/notification.go @@ -13,6 +13,7 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/notification/action" "code.gitea.io/gitea/modules/notification/base" + "code.gitea.io/gitea/modules/notification/bots" "code.gitea.io/gitea/modules/notification/indexer" "code.gitea.io/gitea/modules/notification/mail" "code.gitea.io/gitea/modules/notification/mirror" @@ -40,6 +41,7 @@ func NewContext() { RegisterNotifier(webhook.NewNotifier()) RegisterNotifier(action.NewNotifier()) RegisterNotifier(mirror.NewNotifier()) + RegisterNotifier(bots.NewNotifier()) } // NotifyNewWikiPage notifies creating new wiki pages to notifiers diff --git a/routers/api/bots/bots.go b/routers/api/bots/bots.go new file mode 100644 index 0000000000..0e91c19617 --- /dev/null +++ b/routers/api/bots/bots.go @@ -0,0 +1,149 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "time" + + bots_model "code.gitea.io/gitea/models/bots" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/web" + + "github.com/gorilla/websocket" +) + +func Routes() *web.Route { + r := web.NewRoute() + r.Get("/", Serve) + return r +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 4096, + WriteBufferSize: 4096, + EnableCompression: true, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +var pongWait = 60 * time.Second + +type Message struct { + Version int // + Type int // message type, 1 register 2 error + RunnerUUID string // runner uuid + ErrCode int // error code + ErrContent string // errors message +} + +func Serve(w http.ResponseWriter, r *http.Request) { + log.Trace("websocket init request begin from %s", r.RemoteAddr) + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Error("websocket upgrade failed: %v", err) + return + } + defer c.Close() + log.Trace("websocket upgrade from %s successfully", r.RemoteAddr) + + c.SetReadDeadline(time.Now().Add(pongWait)) + c.SetPongHandler(func(string) error { c.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + +MESSAGE_BUMP: + for { + // read log from client + mt, message, err := c.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || + websocket.IsCloseError(err, websocket.CloseNormalClosure) { + c.Close() + break + } + if !strings.Contains(err.Error(), "i/o timeout") { + log.Error("websocket[%s] read failed: %#v", r.RemoteAddr, err) + } + break + } else { + log.Trace("websocket[%s] received message: %s", r.RemoteAddr, message) + } + + // read message first + var msg Message + if err = json.Unmarshal(message, &msg); err != nil { + log.Error("websocket[%s] unmarshal failed: %#v", r.RemoteAddr, err) + break + } + + switch msg.Version { + case 1: + switch msg.Type { + case 1: + log.Info("websocket[%s] registered", r.RemoteAddr) + runner, err := bots_model.GetRunnerByUUID(msg.RunnerUUID) + if err != nil { + if !errors.Is(err, bots_model.ErrRunnerNotExist{}) { + log.Error("websocket[%s] get runner [%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err) + break + } + err = c.WriteMessage(mt, message) + if err != nil { + log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + break + } + } else { + fmt.Printf("-----%v\n", runner) + // TODO: handle read message + err = c.WriteMessage(mt, message) + if err != nil { + log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + break + } + } + default: + returnMsg := Message{ + Version: 1, + Type: 2, + ErrCode: 1, + ErrContent: "type is not supported", + } + bs, err := json.Marshal(&returnMsg) + if err != nil { + log.Error("websocket[%s] marshal message failed: %v", r.RemoteAddr, err) + break MESSAGE_BUMP + } + err = c.WriteMessage(mt, bs) + if err != nil { + log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + } + break MESSAGE_BUMP + } + default: + returnMsg := Message{ + Version: 1, + Type: 2, + ErrCode: 1, + ErrContent: "version is not supported", + } + bs, err := json.Marshal(&returnMsg) + if err != nil { + log.Error("websocket[%s] marshal message failed: %v", r.RemoteAddr, err) + break MESSAGE_BUMP + } + err = c.WriteMessage(mt, bs) + if err != nil { + log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + } + break MESSAGE_BUMP + } + + // TODO: find new task and send to client + } +} diff --git a/routers/init.go b/routers/init.go index fecc5c439c..80c44bf61b 100644 --- a/routers/init.go +++ b/routers/init.go @@ -31,6 +31,7 @@ import ( "code.gitea.io/gitea/modules/translation" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/web" + bots_router "code.gitea.io/gitea/routers/api/bots" packages_router "code.gitea.io/gitea/routers/api/packages" apiv1 "code.gitea.io/gitea/routers/api/v1" "code.gitea.io/gitea/routers/common" @@ -39,6 +40,7 @@ import ( "code.gitea.io/gitea/services/auth" "code.gitea.io/gitea/services/auth/source/oauth2" "code.gitea.io/gitea/services/automerge" + bots_service "code.gitea.io/gitea/services/bots" "code.gitea.io/gitea/services/cron" "code.gitea.io/gitea/services/mailer" markup_service "code.gitea.io/gitea/services/markup" @@ -160,6 +162,7 @@ func GlobalInitInstalled(ctx context.Context) { mustInit(pull_service.Init) mustInit(automerge.Init) mustInit(task.Init) + mustInit(bots_service.Init) mustInit(repo_migrations.Init) eventsource.GetManager().Init() @@ -195,5 +198,6 @@ func NormalRoutes(ctx context.Context) *web.Route { // This implements the OCI API (Note this is not preceded by /api but is instead /v2) r.Mount("/v2", packages_router.ContainerRoutes(ctx)) } + r.Mount("/api/actions", bots_router.Routes()) return r } diff --git a/services/bots/bots.go b/services/bots/bots.go new file mode 100644 index 0000000000..3970110565 --- /dev/null +++ b/services/bots/bots.go @@ -0,0 +1,62 @@ +// Copyright 2022 Gitea. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "fmt" + + bots_model "code.gitea.io/gitea/models/bots" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/queue" + //"code.gitea.io/gitea/modules/json" +) + +// taskQueue is a global queue of tasks +var taskQueue queue.Queue + +// PushToQueue +func PushToQueue(task *bots_model.Task) { + taskQueue.Push(task) +} + +// Dispatch assign a task to a runner +func Dispatch(task *bots_model.Task) (*bots_model.Runner, error) { + runner, err := bots_model.GetUsableRunner(bots_model.GetRunnerOptions{ + RepoID: task.RepoID, + }) + if err != nil { + return nil, err + } + + return runner, bots_model.AssignTaskToRunner(task.ID, runner.ID) +} + +// Init will start the service to get all unfinished tasks and run them +func Init() error { + taskQueue = queue.CreateQueue("actions_task", handle, &bots_model.Task{}) + if taskQueue == nil { + return fmt.Errorf("Unable to create Task Queue") + } + + go graceful.GetManager().RunWithShutdownFns(taskQueue.Run) + + return nil +} + +func handle(data ...queue.Data) []queue.Data { + var unhandled []queue.Data + for _, datum := range data { + task := datum.(*bots_model.Task) + runner, err := Dispatch(task) + if err != nil { + log.Error("Run task failed: %v", err) + unhandled = append(unhandled, task) + } else { + log.Trace("task %v assigned to %s", task.UUID, runner.UUID) + } + } + return unhandled +}