diff --git a/models/actions/run.go b/models/actions/run.go index 7b62ff884..5396c612f 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -195,6 +195,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork } runJobs := make([]*ActionRunJob, 0, len(jobs)) + var hasWaiting bool for _, v := range jobs { id, job := v.Job() needs := job.Needs() @@ -205,6 +206,8 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork status := StatusWaiting if len(needs) > 0 || run.NeedApproval { status = StatusBlocked + } else { + hasWaiting = true } job.Name, _ = util.SplitStringAtByteN(job.Name, 255) runJobs = append(runJobs, &ActionRunJob{ @@ -225,6 +228,13 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork return err } + // if there is a job in the waiting status, increase tasks version. + if hasWaiting { + if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + return err + } + } + return commiter.Commit() } diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 0002e5077..c7620cd8b 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -111,6 +111,13 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col return affected, nil } + if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() { + // if the status of job changes to waiting again, increase tasks version. + if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + return affected, err + } + } + if job.RunID == 0 { var err error if job, err = GetRunJobByID(ctx, job.ID); err != nil { diff --git a/models/actions/task.go b/models/actions/task.go index 55044ec82..9cc0fd0df 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -215,12 +215,11 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro } func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) { - dbCtx, commiter, err := db.TxContext(ctx) + ctx, commiter, err := db.TxContext(ctx) if err != nil { return nil, false, err } defer commiter.Close() - ctx = dbCtx.WithContext(ctx) e := db.GetEngine(ctx) diff --git a/models/actions/tasks_version.go b/models/actions/tasks_version.go new file mode 100644 index 000000000..5c0a86538 --- /dev/null +++ b/models/actions/tasks_version.go @@ -0,0 +1,105 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/timeutil" +) + +// ActionTasksVersion +// If both ownerID and repoID is zero, its scope is global. +// If ownerID is not zero and repoID is zero, its scope is org (there is no user-level runner currrently). +// If ownerID is zero and repoID is not zero, its scope is repo. +type ActionTasksVersion struct { + ID int64 `xorm:"pk autoincr"` + OwnerID int64 `xorm:"UNIQUE(owner_repo)"` + RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"` + Version int64 + CreatedUnix timeutil.TimeStamp `xorm:"created"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` +} + +func init() { + db.RegisterModel(new(ActionTasksVersion)) +} + +func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (int64, error) { + var tasksVersion ActionTasksVersion + has, err := db.GetEngine(ctx).Where("owner_id = ? AND repo_id = ?", ownerID, repoID).Get(&tasksVersion) + if err != nil { + return 0, err + } else if !has { + return 0, nil + } + return tasksVersion.Version, err +} + +func insertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) { + tasksVersion := &ActionTasksVersion{ + OwnerID: ownerID, + RepoID: repoID, + Version: 1, + } + if _, err := db.GetEngine(ctx).Insert(tasksVersion); err != nil { + return nil, err + } + return tasksVersion, nil +} + +func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) error { + result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID) + if err != nil { + return err + } + affected, err := result.RowsAffected() + if err != nil { + return err + } + + if affected == 0 { + // if update sql does not affect any rows, the database may be broken, + // so re-insert the row of version data here. + if _, err := insertTasksVersion(ctx, ownerID, repoID); err != nil { + return err + } + } + + return nil +} + +func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error { + ctx, commiter, err := db.TxContext(ctx) + if err != nil { + return err + } + defer commiter.Close() + + // 1. increase global + if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Global): %v", err) + return err + } + + // 2. increase owner + if ownerID > 0 { + if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil { + log.Error("IncreaseTasksVersionByScope(Owner): %v", err) + return err + } + } + + // 3. increase repo + if repoID > 0 { + if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil { + log.Error("IncreaseTasksVersionByScope(Repo): %v", err) + return err + } + } + + return commiter.Commit() +} diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 6599cb9cd..bfe4b56cd 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -515,6 +515,8 @@ var migrations = []Migration{ NewMigration("Alter Actions Artifact table", v1_21.AlterActionArtifactTable), // v266 -> v267 NewMigration("Reduce commit status", v1_21.ReduceCommitStatus), + // v267 -> v268 + NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable), } // GetCurrentDBVersion returns the current db version diff --git a/models/migrations/v1_21/v267.go b/models/migrations/v1_21/v267.go new file mode 100644 index 000000000..bc0e954bd --- /dev/null +++ b/models/migrations/v1_21/v267.go @@ -0,0 +1,23 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_21 //nolint + +import ( + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/xorm" +) + +func CreateActionTasksVersionTable(x *xorm.Engine) error { + type ActionTasksVersion struct { + ID int64 `xorm:"pk autoincr"` + OwnerID int64 `xorm:"UNIQUE(owner_repo)"` + RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"` + Version int64 + CreatedUnix timeutil.TimeStamp `xorm:"created"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` + } + + return x.Sync(new(ActionTasksVersion)) +} diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 17801cb32..6de5964cb 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -127,20 +127,39 @@ func (s *Service) Declare( // FetchTask assigns a task to the runner func (s *Service) FetchTask( ctx context.Context, - _ *connect.Request[runnerv1.FetchTaskRequest], + req *connect.Request[runnerv1.FetchTaskRequest], ) (*connect.Response[runnerv1.FetchTaskResponse], error) { runner := GetRunner(ctx) var task *runnerv1.Task - if t, ok, err := pickTask(ctx, runner); err != nil { - log.Error("pick task failed: %v", err) - return nil, status.Errorf(codes.Internal, "pick task: %v", err) - } else if ok { - task = t + tasksVersion := req.Msg.TasksVersion // task version from runner + latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) + if err != nil { + return nil, status.Errorf(codes.Internal, "query tasks version failed: %v", err) + } else if latestVersion == 0 { + if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { + return nil, status.Errorf(codes.Internal, "fail to increase task version: %v", err) + } + // if we don't increase the value of `latestVersion` here, + // the response of FetchTask will return tasksVersion as zero. + // and the runner will treat it as an old version of Gitea. + latestVersion++ } + if tasksVersion != latestVersion { + // if the task version in request is not equal to the version in db, + // it means there may still be some tasks not be assgined. + // try to pick a task for the runner that send the request. + if t, ok, err := pickTask(ctx, runner); err != nil { + log.Error("pick task failed: %v", err) + return nil, status.Errorf(codes.Internal, "pick task: %v", err) + } else if ok { + task = t + } + } res := connect.NewResponse(&runnerv1.FetchTaskResponse{ - Task: task, + Task: task, + TasksVersion: latestVersion, }) return res, nil }