fix: avoid assigning job to multiple runners

This commit is contained in:
Jason Song 2022-10-24 15:55:25 +08:00
parent 44ee3fe550
commit ec673c0e79
2 changed files with 21 additions and 11 deletions

View File

@ -10,6 +10,7 @@ import (
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
"xorm.io/builder"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
) )
@ -90,31 +91,37 @@ func GetRunJobsByRunID(ctx context.Context, runID int64) ([]*RunJob, error) {
return jobs, nil return jobs, nil
} }
func UpdateRunJob(ctx context.Context, job *RunJob, cols ...string) error { func UpdateRunJob(ctx context.Context, job *RunJob, cond builder.Cond, cols ...string) (int64, error) {
e := db.GetEngine(ctx) e := db.GetEngine(ctx)
sess := e.ID(job.ID) sess := e.ID(job.ID)
if len(cols) > 0 { if len(cols) > 0 {
sess.Cols(cols...) sess.Cols(cols...)
} }
if _, err := sess.Update(job); err != nil {
return err if cond != nil {
sess.Where(cond)
} }
if !(slices.Contains(cols, "status") || job.Status != 0) { affected, err := sess.Update(job)
return nil if err != nil {
return 0, err
}
if affected == 0 || (!slices.Contains(cols, "status") && job.Status == 0) {
return affected, nil
} }
if job.RunID == 0 { if job.RunID == 0 {
var err error var err error
if job, err = GetRunJobByID(ctx, job.ID); err != nil { if job, err = GetRunJobByID(ctx, job.ID); err != nil {
return err return affected, err
} }
} }
jobs, err := GetRunJobsByRunID(ctx, job.RunID) jobs, err := GetRunJobsByRunID(ctx, job.RunID)
if err != nil { if err != nil {
return err return affected, err
} }
runStatus := aggregateJobStatus(jobs) runStatus := aggregateJobStatus(jobs)
@ -126,7 +133,7 @@ func UpdateRunJob(ctx context.Context, job *RunJob, cols ...string) error {
if runStatus.IsDone() { if runStatus.IsDone() {
run.Stopped = timeutil.TimeStampNow() run.Stopped = timeutil.TimeStampNow()
} }
return UpdateRun(ctx, run) return affected, UpdateRun(ctx, run)
} }
func aggregateJobStatus(jobs []*RunJob) Status { func aggregateJobStatus(jobs []*RunJob) Status {

View File

@ -16,6 +16,7 @@ import (
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
runnerv1 "gitea.com/gitea/proto-go/runner/v1" runnerv1 "gitea.com/gitea/proto-go/runner/v1"
"xorm.io/builder"
"github.com/nektos/act/pkg/jobparser" "github.com/nektos/act/pkg/jobparser"
) )
@ -255,8 +256,10 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) {
task.Steps = steps task.Steps = steps
job.TaskID = task.ID job.TaskID = task.ID
if err := UpdateRunJob(ctx, job); err != nil { if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
return nil, false, err return nil, false, err
} else if n != 1 {
return nil, false, nil
} }
if job.Run.Status.IsWaiting() { if job.Run.Status.IsWaiting() {
@ -308,11 +311,11 @@ func UpdateTaskByState(state *runnerv1.TaskState) error {
if task.Result != runnerv1.Result_RESULT_UNSPECIFIED { if task.Result != runnerv1.Result_RESULT_UNSPECIFIED {
task.Status = Status(task.Result) task.Status = Status(task.Result)
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix()) task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
if err := UpdateRunJob(ctx, &RunJob{ if _, err := UpdateRunJob(ctx, &RunJob{
ID: task.JobID, ID: task.JobID,
Status: task.Status, Status: task.Status,
Stopped: task.Stopped, Stopped: task.Stopped,
}); err != nil { }, nil); err != nil {
return err return err
} }
} }