diff --git a/core/scheduler.go b/core/scheduler.go index afbceecc5f..966d558a94 100644 --- a/core/scheduler.go +++ b/core/scheduler.go @@ -20,8 +20,8 @@ type Filter struct { // Scheduler schedules Build stages for execution. type Scheduler interface { // Schedule schedules the stage for execution. - Schedule(context.Context, *runnerv1.Stage) error + Schedule(context.Context, *runnerv1.Task) error // Request requests the next stage scheduled for execution. - Request(context.Context, Filter) (*runnerv1.Stage, error) + Request(context.Context, Filter) (*runnerv1.Task, error) } diff --git a/go.mod b/go.mod index a8bb4b63fb..a09b46f928 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( code.gitea.io/gitea-vet v0.2.2-0.20220122151748-48ebc902541b code.gitea.io/sdk/gitea v0.15.1 codeberg.org/gusted/mcaptcha v0.0.0-20220723083913-4f3072e1d570 - gitea.com/gitea/proto-go v0.0.0-20220921035813-6a5980d5b2a2 + gitea.com/gitea/proto-go v0.0.0-20221010043111-8f5efb3ec51d gitea.com/go-chi/binding v0.0.0-20220309004920-114340dabecb gitea.com/go-chi/cache v0.2.0 gitea.com/go-chi/captcha v0.0.0-20211013065431-70641c1a35d5 diff --git a/go.sum b/go.sum index 828e33cdf2..4a50d987ab 100644 --- a/go.sum +++ b/go.sum @@ -83,8 +83,10 @@ git.sr.ht/~mariusor/go-xsd-duration v0.0.0-20220703122237-02e73435a078 h1:cliQ4H git.sr.ht/~mariusor/go-xsd-duration v0.0.0-20220703122237-02e73435a078/go.mod h1:g/V2Hjas6Z1UHUp4yIx6bATpNzJ7DYtD0FG3+xARWxs= gitea.com/gitea/act v0.0.0-20221008102131-d89ab14fb580 h1:F/VSl4oP5Gqe0FQ2i6BX/ODIhBc/cxfp5p5yA8pOSb4= gitea.com/gitea/act v0.0.0-20221008102131-d89ab14fb580/go.mod h1:lpzib6X73FHLSaTqTakan1xcsCAVhlZvPSpLns7jkRo= -gitea.com/gitea/proto-go v0.0.0-20220921035813-6a5980d5b2a2 h1:SKAZ2Fnay8Yx1Yw1BSU/ATEk3/WAFL/lMXhvBBVzyu4= -gitea.com/gitea/proto-go v0.0.0-20220921035813-6a5980d5b2a2/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= +gitea.com/gitea/proto-go v0.0.0-20221008030753-07d7bbcfb9f5 h1:tCf76mJIGJkIvFCOFy6O30diGdA5zt07EUSZangUAzk= +gitea.com/gitea/proto-go v0.0.0-20221008030753-07d7bbcfb9f5/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= +gitea.com/gitea/proto-go v0.0.0-20221010043111-8f5efb3ec51d h1:/c5HiuU195m2HR7VhJV11CnwpEWWXkgYi02Z6SU37ks= +gitea.com/gitea/proto-go v0.0.0-20221010043111-8f5efb3ec51d/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= gitea.com/go-chi/binding v0.0.0-20220309004920-114340dabecb h1:Yy0Bxzc8R2wxiwXoG/rECGplJUSpXqCsog9PuJFgiHs= gitea.com/go-chi/binding v0.0.0-20220309004920-114340dabecb/go.mod h1:77TZu701zMXWJFvB8gvTbQ92zQ3DQq/H7l5wAEjQRKc= gitea.com/go-chi/cache v0.0.0-20210110083709-82c4c9ce2d5e/go.mod h1:k2V/gPDEtXGjjMGuBJiapffAXTv76H4snSmlJRLUhH0= diff --git a/models/bots/runner.go b/models/bots/runner.go index ef7f5d3773..cba459e017 100644 --- a/models/bots/runner.go +++ b/models/bots/runner.go @@ -35,9 +35,6 @@ type Runner struct { ID int64 UUID string `xorm:"CHAR(36) UNIQUE"` Name string `xorm:"VARCHAR(32) UNIQUE"` - OS string `xorm:"VARCHAR(16) index"` // the runner running os - Arch string `xorm:"VARCHAR(16) index"` // the runner running architecture - Type string `xorm:"VARCHAR(16)"` OwnerID int64 `xorm:"index"` // org level runner, 0 means system Owner *user_model.User `xorm:"-"` RepoID int64 `xorm:"index"` // repo level runner, if orgid also is zero, then it's a global @@ -46,9 +43,14 @@ type Runner struct { Base int // 0 native 1 docker 2 virtual machine RepoRange string // glob match which repositories could use this runner Token string - Capacity int64 - LastOnline timeutil.TimeStamp `xorm:"index"` - Created timeutil.TimeStamp `xorm:"created"` + + // Store OS and Artch. + AgentLabels []string + // Store custom labes use defined. + CustomLabels []string + + LastOnline timeutil.TimeStamp `xorm:"index"` + Created timeutil.TimeStamp `xorm:"created"` } func (Runner) TableName() string { diff --git a/routers/api/bots/runner/runner.go b/routers/api/bots/runner/runner.go index abb7caa949..7dbd4d5ca0 100644 --- a/routers/api/bots/runner/runner.go +++ b/routers/api/bots/runner/runner.go @@ -10,7 +10,6 @@ import ( "code.gitea.io/gitea/core" bots_model "code.gitea.io/gitea/models/bots" - repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/log" runnerv1 "gitea.com/gitea/proto-go/runner/v1" "gitea.com/gitea/proto-go/runner/v1/runnerv1connect" @@ -18,6 +17,8 @@ import ( "github.com/bufbuild/connect-go" ) +var _ runnerv1connect.RunnerServiceClient = (*Service)(nil) + type Service struct { Scheduler core.Scheduler @@ -38,25 +39,24 @@ func (s *Service) Register( return nil, errors.New("missing runner token") } + // TODO: Get token data from runner_token table runner, err := bots_model.GetRunnerByToken(token) if err != nil { return nil, errors.New("runner not found") } // update runner information - runner.Arch = req.Msg.Arch - runner.OS = req.Msg.Os - runner.Capacity = req.Msg.Capacity - if err := bots_model.UpdateRunner(ctx, runner, []string{"arch", "os", "capacity"}...); err != nil { + runner.AgentLabels = req.Msg.AgentLabels + runner.CustomLabels = req.Msg.CustomLabels + runner.Name = req.Msg.Name + if err := bots_model.UpdateRunner(ctx, runner, []string{"name", "agent_labels", "custom_labels"}...); err != nil { return nil, errors.New("can't update runner") } res := connect.NewResponse(&runnerv1.RegisterResponse{ Runner: &runnerv1.Runner{ - Uuid: runner.UUID, - Os: req.Msg.Os, - Arch: req.Msg.Arch, - Capacity: req.Msg.Capacity, + Uuid: runner.UUID, + Token: runner.Token, }, }) @@ -64,15 +64,13 @@ func (s *Service) Register( } // Request requests the next available build stage for execution. -func (s *Service) Request( +func (s *Service) FetchTask( ctx context.Context, - req *connect.Request[runnerv1.RequestRequest], -) (*connect.Response[runnerv1.RequestResponse], error) { + req *connect.Request[runnerv1.FetchTaskRequest], +) (*connect.Response[runnerv1.FetchTaskResponse], error) { log.Debug("manager: request queue item") - stage, err := s.Scheduler.Request(ctx, core.Filter{ - Kind: req.Msg.Kind, - Type: req.Msg.Type, + task, err := s.Scheduler.Request(ctx, core.Filter{ OS: req.Msg.Os, Arch: req.Msg.Arch, }) @@ -85,86 +83,29 @@ func (s *Service) Request( return nil, err } - res := connect.NewResponse(&runnerv1.RequestResponse{ - Stage: stage, + // TODO: update task and check data lock + task.Machine = req.Msg.Os + + res := connect.NewResponse(&runnerv1.FetchTaskResponse{ + Task: task, }) return res, nil } -// Details fetches build details -func (s *Service) Detail( +// UpdateTask updates the task status. +func (s *Service) UpdateTask( ctx context.Context, - req *connect.Request[runnerv1.DetailRequest], -) (*connect.Response[runnerv1.DetailResponse], error) { - log.Info("stag id %d", req.Msg.Stage.Id) - - // fetch stage data - stage, err := bots_model.GetStageByID(req.Msg.Stage.Id) - if err != nil { - return nil, err - } - - stage.Machine = req.Msg.Stage.Machine - stage.Status = core.StatusPending - - count, err := bots_model.UpdateBuildStage(stage, "machine", "status") - if err != nil { - return nil, err - } - if count != 1 { - return nil, core.ErrDataLock - } - - // fetch build data - build, err := bots_model.GetBuildByID(stage.BuildID) - if err != nil { - return nil, err - } - - // fetch repo data - repo, err := repo_model.GetRepositoryByID(build.RepoID) - if err != nil { - return nil, err - } - - res := connect.NewResponse(&runnerv1.DetailResponse{ - Stage: &runnerv1.Stage{ - Id: stage.ID, - BuildId: stage.BuildID, - Name: stage.Name, - Kind: stage.Kind, - Type: stage.Type, - Status: string(stage.Status), - Started: int64(stage.Started), - Stopped: int64(stage.Stopped), - Machine: stage.Machine, - }, - Build: &runnerv1.Build{ - Id: build.ID, - Name: build.Name, - }, - Repo: &runnerv1.Repo{ - Id: repo.ID, - Name: repo.Name, - }, - }) + req *connect.Request[runnerv1.UpdateTaskRequest], +) (*connect.Response[runnerv1.UpdateTaskResponse], error) { + res := connect.NewResponse(&runnerv1.UpdateTaskResponse{}) return res, nil } -// Update updates the build stage. -func (s *Service) Update( +// UpdateLog uploads log of the task. +func (s *Service) UpdateLog( ctx context.Context, - req *connect.Request[runnerv1.UpdateRequest], -) (*connect.Response[runnerv1.UpdateResponse], error) { - res := connect.NewResponse(&runnerv1.UpdateResponse{}) - return res, nil -} - -// UpdateStep updates the build step. -func (s *Service) UpdateStep( - ctx context.Context, - req *connect.Request[runnerv1.UpdateStepRequest], -) (*connect.Response[runnerv1.UpdateStepResponse], error) { - res := connect.NewResponse(&runnerv1.UpdateStepResponse{}) + req *connect.Request[runnerv1.UpdateLogRequest], +) (*connect.Response[runnerv1.UpdateLogResponse], error) { + res := connect.NewResponse(&runnerv1.UpdateLogResponse{}) return res, nil } diff --git a/routers/api/bots/scheduler/queue/queue.go b/routers/api/bots/scheduler/queue/queue.go index 66730f55a7..95495be146 100644 --- a/routers/api/bots/scheduler/queue/queue.go +++ b/routers/api/bots/scheduler/queue/queue.go @@ -19,7 +19,7 @@ type worker struct { typ string os string arch string - channel chan *runnerv1.Stage + channel chan *runnerv1.Task } type queue struct { @@ -32,7 +32,7 @@ type queue struct { ctx context.Context } -func (q *queue) Schedule(ctx context.Context, stage *runnerv1.Stage) error { +func (q *queue) Schedule(ctx context.Context, stage *runnerv1.Task) error { select { case q.ready <- struct{}{}: default: @@ -40,13 +40,13 @@ func (q *queue) Schedule(ctx context.Context, stage *runnerv1.Stage) error { return nil } -func (q *queue) Request(ctx context.Context, params core.Filter) (*runnerv1.Stage, error) { +func (q *queue) Request(ctx context.Context, params core.Filter) (*runnerv1.Task, error) { w := &worker{ kind: params.Kind, typ: params.Type, os: params.OS, arch: params.Arch, - channel: make(chan *runnerv1.Stage), + channel: make(chan *runnerv1.Task), } q.Lock() q.workers[w] = struct{}{} @@ -123,15 +123,15 @@ func (q *queue) signal(ctx context.Context) error { } } - stage := &runnerv1.Stage{ - Id: item.ID, - BuildId: item.BuildID, - Name: item.Name, - Kind: item.Name, - Type: item.Type, - Status: string(item.Status), - Started: int64(item.Started), - Stopped: int64(item.Stopped), + stage := &runnerv1.Task{ + Id: item.ID, + // BuildId: item.BuildID, + // Name: item.Name, + // Kind: item.Name, + // Type: item.Type, + // Status: string(item.Status), + // Started: int64(item.Started), + // Stopped: int64(item.Stopped), } w.channel <- stage