From 530faf72278da8849201ba2296ed6fa8f7816dd0 Mon Sep 17 00:00:00 2001 From: Jason Song Date: Mon, 10 Oct 2022 15:35:26 +0800 Subject: [PATCH] feat: assign task to runner --- models/bots/run.go | 12 ++++-- models/bots/run_job.go | 3 ++ models/bots/task.go | 71 +++++++++++++++++++++++++++++++ routers/api/bots/grpc/grpc.go | 39 +---------------- routers/api/bots/grpc/runner.go | 3 +- routers/api/bots/runner/runner.go | 70 ++++++++++++++++++++++-------- routers/api/bots/runner/unary.go | 48 +++++++++++++++++++++ 7 files changed, 185 insertions(+), 61 deletions(-) create mode 100644 routers/api/bots/runner/unary.go diff --git a/models/bots/run.go b/models/bots/run.go index 19f5e7e0d1..ffece5f3e7 100644 --- a/models/bots/run.go +++ b/models/bots/run.go @@ -10,6 +10,7 @@ import ( "code.gitea.io/gitea/core" "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/models/webhook" "code.gitea.io/gitea/modules/timeutil" @@ -21,9 +22,10 @@ import ( type Run struct { ID int64 Name string - RepoID int64 `xorm:"index unique(repo_workflow_index)"` - WorkflowID string `xorm:"index unique(repo_workflow_index)"` // the name of workflow file - Index int64 `xorm:"index unique(repo_workflow_index)"` // a unique number for each run of a particular workflow in a repository + RepoID int64 `xorm:"index unique(repo_workflow_index)"` + Repo *repo_model.Repository `xorm:"-"` + WorkflowID string `xorm:"index unique(repo_workflow_index)"` // the name of workflow file + Index int64 `xorm:"index unique(repo_workflow_index)"` // a unique number for each run of a particular workflow in a repository TriggerUserID int64 TriggerUser *user_model.User `xorm:"-"` Ref string @@ -76,12 +78,14 @@ func InsertRun(run *Run, jobs []*jobparser.SingleWorkflow) error { runJobs := make([]*RunJob, 0, len(jobs)) for _, v := range jobs { - _, job := v.Job() + id, job := v.Job() payload, _ := v.Marshal() runJobs = append(runJobs, &RunJob{ RunID: run.ID, Name: job.Name, + Ready: true, // TODO: should be false if there are needs to satisfy WorkflowPayload: payload, + JobID: id, Needs: nil, // TODO: analyse needs RunsOn: job.RunsOn(), TaskID: 0, diff --git a/models/bots/run_job.go b/models/bots/run_job.go index 361803e4e5..3a477eaa27 100644 --- a/models/bots/run_job.go +++ b/models/bots/run_job.go @@ -15,7 +15,10 @@ type RunJob struct { ID int64 RunID int64 Name string + Ready bool // ready to be executed + Attempt int64 WorkflowPayload []byte + JobID string // job id in workflow, not job's id Needs []int64 `xorm:"JSON TEXT"` RunsOn []string `xorm:"JSON TEXT"` TaskID int64 // the latest task of the job diff --git a/models/bots/task.go b/models/bots/task.go index 1fa99758bb..efad43168e 100644 --- a/models/bots/task.go +++ b/models/bots/task.go @@ -5,6 +5,7 @@ package bots import ( + "code.gitea.io/gitea/core" "code.gitea.io/gitea/models/db" "code.gitea.io/gitea/modules/timeutil" ) @@ -31,3 +32,73 @@ func init() { func (Task) TableName() string { return "bots_task" } + +func CreateTask(runner *Runner) (task *Task, job *RunJob, run *Run, ok bool, err error) { + ctx, commiter, err := db.TxContext() + if err != nil { + return + } + defer commiter.Close() + + var jobs []*RunJob + if err = db.GetEngine(ctx).Where("task_id = 0 AND ready = true").OrderBy("id").Find(jobs); err != nil { + return + } + + labels := append(runner.AgentLabels, runner.CustomLabels...) + for _, v := range jobs { + if isSubset(v.RunsOn, labels) { + job = v + break + } + } + if job == nil { + return + } + + now := timeutil.TimeStampNow() + job.Attempt++ + job.Started = now + job.Status = core.StatusRunning + + task = &Task{ + JobID: job.ID, + Attempt: job.Attempt, + RunnerID: runner.ID, + Started: now, + } + + if err = db.Insert(ctx, task); err != nil { + return + } + + job.TaskID = task.ID + if _, err = db.GetEngine(ctx).ID(job.ID).Update(job); err != nil { + return + } + + run = &Run{} + if _, err = db.GetEngine(ctx).ID(job.RunID).Get(run); err != nil { + return + } + + if err = commiter.Commit(); err != nil { + return + } + + ok = true + return +} + +func isSubset(set, subset []string) bool { + m := make(map[string]struct{}, len(set)) + for _, v := range set { + m[v] = struct{}{} + } + for _, v := range subset { + if _, ok := m[v]; !ok { + return false + } + } + return true +} diff --git a/routers/api/bots/grpc/grpc.go b/routers/api/bots/grpc/grpc.go index dc70bf6257..a59672f4f5 100644 --- a/routers/api/bots/grpc/grpc.go +++ b/routers/api/bots/grpc/grpc.go @@ -5,14 +5,11 @@ package grpc import ( - "context" "net/http" - "strings" - - "code.gitea.io/gitea/models/bots" "gitea.com/gitea/proto-go/ping/v1/pingv1connect" "gitea.com/gitea/proto-go/runner/v1/runnerv1connect" + "github.com/bufbuild/connect-go" grpcreflect "github.com/bufbuild/connect-grpcreflect-go" "google.golang.org/grpc/health/grpc_health_v1" @@ -44,37 +41,3 @@ func V1AlphaRoute() (string, http.Handler) { compress1KB, ) } - -var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unaryFunc connect.UnaryFunc) connect.UnaryFunc { - return func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) { - if methodName(request) == "Register" { - return unaryFunc(ctx, request) - } - uuid := request.Header().Get("X-Runner-Token") // TODO: shouldn't be X-Runner-Token, maybe X-Runner-UUID - // TODO: get runner from db, refuse request if it doesn't exist - r := &bots.Runner{ - UUID: uuid, - } - ctx = context.WithValue(ctx, runnerCtxKey{}, r) - return unaryFunc(ctx, request) - } -})) - -func methodName(req connect.AnyRequest) string { - splits := strings.Split(req.Spec().Procedure, "/") - if len(splits) > 0 { - return splits[len(splits)-1] - } - return "" -} - -type runnerCtxKey struct{} - -func GetRunner(ctx context.Context) *bots.Runner { - if v := ctx.Value(runnerCtxKey{}); v != nil { - if r, ok := v.(*bots.Runner); ok { - return r - } - } - return nil -} diff --git a/routers/api/bots/grpc/runner.go b/routers/api/bots/grpc/runner.go index a4e25d649f..c084f99488 100644 --- a/routers/api/bots/grpc/runner.go +++ b/routers/api/bots/grpc/runner.go @@ -9,7 +9,6 @@ import ( "code.gitea.io/gitea/routers/api/bots/runner" "code.gitea.io/gitea/routers/api/bots/scheduler/queue" - "gitea.com/gitea/proto-go/runner/v1/runnerv1connect" ) @@ -21,6 +20,6 @@ func RunnerRoute() (string, http.Handler) { return runnerv1connect.NewRunnerServiceHandler( runnerService, compress1KB, - withRunner, + runner.WithRunner, ) } diff --git a/routers/api/bots/runner/runner.go b/routers/api/bots/runner/runner.go index d10a878929..8c46d56f47 100644 --- a/routers/api/bots/runner/runner.go +++ b/routers/api/bots/runner/runner.go @@ -7,6 +7,7 @@ package runner import ( "context" "errors" + "fmt" "net/url" "strings" @@ -14,12 +15,15 @@ import ( bots_model "code.gitea.io/gitea/models/bots" "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/models/user" - "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/json" runnerv1 "gitea.com/gitea/proto-go/runner/v1" "gitea.com/gitea/proto-go/runner/v1/runnerv1connect" "github.com/bufbuild/connect-go" gouuid "github.com/google/uuid" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" ) var _ runnerv1connect.RunnerServiceClient = (*Service)(nil) @@ -121,28 +125,19 @@ func (s *Service) Register( return res, nil } -// Request requests the next available build stage for execution. +// FetchTask assigns a task to the runner func (s *Service) FetchTask( ctx context.Context, req *connect.Request[runnerv1.FetchTaskRequest], ) (*connect.Response[runnerv1.FetchTaskResponse], error) { - log.Debug("manager: request queue item") + runner := GetRunner(ctx) - task, err := s.Scheduler.Request(ctx, core.Filter{ - OS: req.Msg.Os, - Arch: req.Msg.Arch, - }) - if err != nil && ctx.Err() != nil { - log.Debug("manager: context canceled") - return nil, err + var task *runnerv1.Task + if t, ok, err := s.pickTask(ctx, runner); err != nil { + return nil, status.Errorf(codes.Internal, "pick task: %v", err) + } else if ok { + task = t } - if err != nil { - log.Warn("manager: request queue item error") - return nil, err - } - - // TODO: update task and check data lock - task.Machine = req.Msg.Os res := connect.NewResponse(&runnerv1.FetchTaskResponse{ Task: task, @@ -167,3 +162,44 @@ func (s *Service) UpdateLog( res := connect.NewResponse(&runnerv1.UpdateLogResponse{}) return res, nil } + +func (s *Service) pickTask(ctx context.Context, runner *bots_model.Runner) (*runnerv1.Task, bool, error) { + t, job, run, ok, err := bots_model.CreateTask(runner) + if err != nil { + return nil, false, fmt.Errorf("CreateTask: %w", err) + } + if !ok { + return nil, false, nil + } + + event := map[string]interface{}{} + _ = json.Unmarshal([]byte(run.EventPayload), &event) + + // TODO: more context in https://docs.github.com/cn/actions/learn-github-actions/contexts#github-context + taskContext, _ := structpb.NewStruct(map[string]interface{}{ + "event": event, + "run_id": fmt.Sprint(run.ID), + "run_number": fmt.Sprint(run.Index), + "run_attempt": fmt.Sprint(job.Attempt), + "actor": fmt.Sprint(run.TriggerUser.Name), + "repository": fmt.Sprint(run.Repo.Name), + "event_name": fmt.Sprint(run.Event.Event()), + "sha": fmt.Sprint(run.CommitSHA), + "ref": fmt.Sprint(run.Ref), + "ref_name": "", + "ref_type": "", + "head_ref": "", + "base_ref": "", + "token": "", + "repository_owner": fmt.Sprint(run.Repo.OwnerName), + "retention_days": "", + }) + + task := &runnerv1.Task{ + Id: t.ID, + WorkflowPayload: job.WorkflowPayload, + Context: taskContext, + Secrets: nil, // TODO: query secrets + } + return task, true, nil +} diff --git a/routers/api/bots/runner/unary.go b/routers/api/bots/runner/unary.go new file mode 100644 index 0000000000..005db2103c --- /dev/null +++ b/routers/api/bots/runner/unary.go @@ -0,0 +1,48 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package runner + +import ( + "context" + "strings" + + "code.gitea.io/gitea/models/bots" + + "github.com/bufbuild/connect-go" +) + +var WithRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unaryFunc connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) { + if methodName(request) == "Register" { + return unaryFunc(ctx, request) + } + uuid := request.Header().Get("X-Runner-Token") // TODO: shouldn't be X-Runner-Token, maybe X-Runner-UUID + // TODO: get runner from db, refuse request if it doesn't exist + r := &bots.Runner{ + UUID: uuid, + } + ctx = context.WithValue(ctx, runnerCtxKey{}, r) + return unaryFunc(ctx, request) + } +})) + +func methodName(req connect.AnyRequest) string { + splits := strings.Split(req.Spec().Procedure, "/") + if len(splits) > 0 { + return splits[len(splits)-1] + } + return "" +} + +type runnerCtxKey struct{} + +func GetRunner(ctx context.Context) *bots.Runner { + if v := ctx.Value(runnerCtxKey{}); v != nil { + if r, ok := v.(*bots.Runner); ok { + return r + } + } + return nil +}