feat: tranfer log to storage

This commit is contained in:
Jason Song 2022-10-19 15:49:23 +08:00
parent d2f8d5ded1
commit 8dbe30dff9
8 changed files with 129 additions and 46 deletions

View File

@ -213,7 +213,7 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) {
_, wolkflowJob = gots[0].Job()
}
if _, err := e.Insert(ctx, task); err != nil {
if _, err := e.Insert(task); err != nil {
return nil, false, err
}
@ -230,7 +230,7 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) {
Number: int64(i),
}
}
if _, err := e.Insert(ctx, steps); err != nil {
if _, err := e.Insert(steps); err != nil {
return nil, false, err
}
task.Steps = steps

View File

@ -94,20 +94,21 @@ func addBotTables(x *xorm.Engine) error {
type BotsRunIndex db.ResourceIndex
type BotsTask struct {
ID int64
JobID int64
Attempt int64
RunnerID int64 `xorm:"index"`
Result int32
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
LogURL string // dbfs:///a/b.log or s3://endpoint.com/a/b.log and etc.
LogLength int64 // lines count
LogSize int64 // blob size
LogIndexes *[]int64 `xorm:"BLOB"` // line number to offset
LogExpired bool
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
ID int64
JobID int64
Attempt int64
RunnerID int64 `xorm:"index"`
Result int32
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
LogFilename string // file name of log
LogInStorage bool // read log from database or from storage
LogLength int64 // lines count
LogSize int64 // blob size
LogIndexes *[]int64 `xorm:"BLOB"` // line number to offset
LogExpired bool
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
type BotsTaskStep struct {

View File

@ -14,6 +14,8 @@ import (
"time"
"code.gitea.io/gitea/models/dbfs"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/storage"
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
"google.golang.org/protobuf/types/known/timestamppb"
@ -55,15 +57,15 @@ func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runne
return ns, nil
}
func ReadLogs(ctx context.Context, filename string, offset int64, limit int64) ([]*runnerv1.LogRow, error) {
name := DBFSPrefix + filename
f, err := dbfs.Open(ctx, name)
func ReadLogs(ctx context.Context, inStorage bool, filename string, offset int64, limit int64) ([]*runnerv1.LogRow, error) {
f, err := openLogs(ctx, inStorage, filename)
if err != nil {
return nil, fmt.Errorf("dbfs Open %q: %w", name, err)
return nil, err
}
defer f.Close()
if _, err := f.Seek(offset, io.SeekStart); err != nil {
return nil, fmt.Errorf("dbfs Seek %q: %w", name, err)
return nil, fmt.Errorf("file seek: %w", err)
}
scanner := bufio.NewScanner(f)
@ -89,6 +91,41 @@ func ReadLogs(ctx context.Context, filename string, offset int64, limit int64) (
return rows, nil
}
func TransferLogs(ctx context.Context, filename string) (func(), error) {
name := DBFSPrefix + filename
remove := func() {
if err := dbfs.Remove(ctx, name); err != nil {
log.Warn("dbfs remove %q: %v", name, err)
}
}
f, err := dbfs.Open(ctx, name)
if err != nil {
return nil, fmt.Errorf("dbfs open %q: %w", name, err)
}
defer f.Close()
if _, err := storage.Builds.Save(filename, f, -1); err != nil {
return nil, fmt.Errorf("storage save %q: %w", filename, err)
}
return remove, nil
}
func openLogs(ctx context.Context, inStorage bool, filename string) (io.ReadSeekCloser, error) {
if !inStorage {
name := DBFSPrefix + filename
f, err := dbfs.Open(ctx, name)
if err != nil {
return nil, fmt.Errorf("dbfs open %q: %w", name, err)
}
return f, nil
}
f, err := storage.Builds.Open(filename)
if err != nil {
return nil, fmt.Errorf("storage open %q: %w", filename, err)
}
return f, nil
}
func FormatLog(timestamp time.Time, content string) string {
// Content shouldn't contain new line, it will break log indexes, other control chars are safe.
content = strings.ReplaceAll(content, "\n", `\n`)

28
modules/setting/build.go Normal file
View File

@ -0,0 +1,28 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package setting
import (
"code.gitea.io/gitea/modules/log"
)
// Builds settings
var (
Builds = struct {
Storage
Enabled bool
}{
Enabled: true,
}
)
func newBuilds() {
sec := Cfg.Section("builds")
if err := sec.MapTo(&Builds); err != nil {
log.Fatal("Failed to map Builds settings: %v", err)
}
Builds.Storage = getStorage("builds", "", nil)
}

View File

@ -1062,6 +1062,8 @@ func loadFromConf(allowEmpty bool, extraConfig string) {
newPackages()
newBuilds()
if err = Cfg.Section("ui").MapTo(&UI); err != nil {
log.Fatal("Failed to map UI settings: %v", err)
} else if err = Cfg.Section("markdown").MapTo(&Markdown); err != nil {

View File

@ -126,31 +126,27 @@ var (
// Packages represents packages storage
Packages ObjectStorage
// Builds represents builds storage
Builds ObjectStorage
)
// Init init the stoarge
func Init() error {
if err := initAttachments(); err != nil {
return err
for _, f := range []func() error{
initAttachments,
initAvatars,
initRepoAvatars,
initLFS,
initRepoArchives,
initPackages,
initBuilds,
} {
if err := f(); err != nil {
return err
}
}
if err := initAvatars(); err != nil {
return err
}
if err := initRepoAvatars(); err != nil {
return err
}
if err := initLFS(); err != nil {
return err
}
if err := initRepoArchives(); err != nil {
return err
}
return initPackages()
return nil
}
// NewStorage takes a storage type and some config and returns an ObjectStorage or an error
@ -201,3 +197,9 @@ func initPackages() (err error) {
Packages, err = NewStorage(setting.Packages.Storage.Type, &setting.Packages.Storage)
return err
}
func initBuilds() (err error) {
log.Info("Initialising Builds storage with type: %s", setting.Builds.Storage.Type)
Builds, err = NewStorage(setting.Builds.Storage.Type, &setting.Builds.Storage)
return err
}

View File

@ -174,6 +174,10 @@ func (s *Service) UpdateLog(
return res, nil
}
if task.LogInStorage {
return nil, status.Errorf(codes.AlreadyExists, "log file has been archived")
}
rows := req.Msg.Rows[ack-req.Msg.Index:]
ns, err := bots.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
if err != nil {
@ -187,14 +191,23 @@ func (s *Service) UpdateLog(
*task.LogIndexes = append(*task.LogIndexes, task.LogSize)
task.LogSize += int64(n)
}
if err := bots_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size"); err != nil {
return nil, status.Errorf(codes.Internal, "update task: %v", err)
}
res.Msg.AckIndex = task.LogLength
var remove func()
if req.Msg.NoMore {
// TODO: transfer logs to storage from db
task.LogInStorage = true
remove, err = bots.TransferLogs(ctx, task.LogFilename)
if err != nil {
return nil, status.Errorf(codes.Internal, "transfer logs: %v", err)
}
}
if err := bots_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
return nil, status.Errorf(codes.Internal, "update task: %v", err)
}
if remove != nil {
remove()
}
return res, nil

View File

@ -180,7 +180,7 @@ func BuildViewPost(ctx *context.Context) {
index := step.LogIndex + cursor.Cursor
length := step.LogLength - cursor.Cursor
offset := (*task.LogIndexes)[index]
logRows, err = bots.ReadLogs(ctx, task.LogFilename, offset, length)
logRows, err = bots.ReadLogs(ctx, task.LogInStorage, task.LogFilename, offset, length)
if err != nil {
ctx.Error(http.StatusInternalServerError, err.Error())
return