From d2f8d5ded1aadc559c93038c580ac2bed81ffdf7 Mon Sep 17 00:00:00 2001 From: Jason Song <i@wolfogre.com> Date: Wed, 19 Oct 2022 14:54:11 +0800 Subject: [PATCH] fix: use file name for log --- models/bots/task.go | 32 +++++++++++++++---------- modules/bots/log.go | 40 ++++--------------------------- routers/api/bots/runner/runner.go | 8 +------ routers/web/dev/buildview.go | 2 +- 4 files changed, 27 insertions(+), 55 deletions(-) diff --git a/models/bots/task.go b/models/bots/task.go index 20840ad474..0f4ef3724e 100644 --- a/models/bots/task.go +++ b/models/bots/task.go @@ -33,11 +33,12 @@ type Task struct { Started timeutil.TimeStamp Stopped timeutil.TimeStamp - LogURL string // dbfs:///a/b.log or s3://endpoint.com/a/b.log and etc. - LogLength int64 // lines count - LogSize int64 // blob size - LogIndexes *LogIndexes `xorm:"BLOB"` // line number to offset - LogExpired bool + LogFilename string // file name of log + LogInStorage bool // read log from database or from storage + LogLength int64 // lines count + LogSize int64 // blob size + LogIndexes *LogIndexes `xorm:"BLOB"` // line number to offset + LogExpired bool // files that are too old will be deleted Created timeutil.TimeStamp `xorm:"created"` Updated timeutil.TimeStamp `xorm:"updated"` @@ -167,8 +168,10 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) { } defer commiter.Close() + e := db.GetEngine(ctx) + var jobs []*RunJob - if err := db.GetEngine(ctx).Where("task_id=? AND ready=?", 0, true).OrderBy("id").Find(&jobs); err != nil { + if err := e.Where("task_id=? AND ready=?", 0, true).OrderBy("id").Find(&jobs); err != nil { return nil, false, err } @@ -185,6 +188,9 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) { if job == nil { return nil, false, nil } + if err := job.LoadAttributes(ctx); err != nil { + return nil, false, err + } now := timeutil.TimeStampNow() job.Attempt++ @@ -207,7 +213,12 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) { _, wolkflowJob = gots[0].Job() } - if err := db.Insert(ctx, task); err != nil { + if _, err := e.Insert(ctx, task); err != nil { + return nil, false, err + } + + task.LogFilename = fmt.Sprintf("%s/%d.log", job.Run.Repo.FullName(), task.ID) + if _, err := e.ID(task.ID).Cols("log_filename").Update(task); err != nil { return nil, false, err } @@ -219,20 +230,17 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) { Number: int64(i), } } - if err := db.Insert(ctx, steps); err != nil { + if _, err := e.Insert(ctx, steps); err != nil { return nil, false, err } task.Steps = steps job.TaskID = task.ID - if _, err := db.GetEngine(ctx).ID(job.ID).Update(job); err != nil { + if _, err := e.ID(job.ID).Update(job); err != nil { return nil, false, err } task.Job = job - if err := task.Job.LoadAttributes(ctx); err != nil { - return nil, false, err - } if err := commiter.Commit(); err != nil { return nil, false, err diff --git a/modules/bots/log.go b/modules/bots/log.go index 2a8a55a5fd..eb1e2e8d01 100644 --- a/modules/bots/log.go +++ b/modules/bots/log.go @@ -9,7 +9,6 @@ import ( "context" "fmt" "io" - "net/url" "os" "strings" "time" @@ -22,23 +21,14 @@ import ( const ( MaxLineSize = 64 * 1024 + DBFSPrefix = "bots_tasks/" timeFormat = time.RFC3339Nano defaultBufSize = 64 * 1024 ) -const ( - StorageSchemaDBFS = "dbfs" - StorageSchemaFile = "file" - StorageSchemaS3 = "s3" - // ... -) - -func WriteLogs(ctx context.Context, rawURL string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { - name, err := parseDBFSName(rawURL) - if err != nil { - return nil, err - } +func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { + name := DBFSPrefix + filename f, err := dbfs.OpenFile(ctx, name, os.O_WRONLY|os.O_CREATE) if err != nil { return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err) @@ -65,12 +55,8 @@ func WriteLogs(ctx context.Context, rawURL string, offset int64, rows []*runnerv return ns, nil } -func ReadLogs(ctx context.Context, rawURL string, offset int64, limit int64) ([]*runnerv1.LogRow, error) { - name, err := parseDBFSName(rawURL) - if err != nil { - return nil, err - } - +func ReadLogs(ctx context.Context, filename string, offset int64, limit int64) ([]*runnerv1.LogRow, error) { + name := DBFSPrefix + filename f, err := dbfs.Open(ctx, name) if err != nil { return nil, fmt.Errorf("dbfs Open %q: %w", name, err) @@ -103,22 +89,6 @@ func ReadLogs(ctx context.Context, rawURL string, offset int64, limit int64) ([] return rows, nil } -func parseDBFSName(rawURL string) (string, error) { - u, err := url.Parse(rawURL) - if err != nil { - return "", fmt.Errorf("invalid url: %w", err) - } - if u.Scheme != StorageSchemaDBFS { - return "", fmt.Errorf("%s supported only yet", StorageSchemaDBFS) - } - - if u.Path == "" { - return "", fmt.Errorf("empty path") - } - - return u.Path, nil -} - func FormatLog(timestamp time.Time, content string) string { // Content shouldn't contain new line, it will break log indexes, other control chars are safe. content = strings.ReplaceAll(content, "\n", `\n`) diff --git a/routers/api/bots/runner/runner.go b/routers/api/bots/runner/runner.go index dbc779954f..754af9706b 100644 --- a/routers/api/bots/runner/runner.go +++ b/routers/api/bots/runner/runner.go @@ -167,12 +167,6 @@ func (s *Service) UpdateLog( if err != nil { return nil, status.Errorf(codes.Internal, "get task: %v", err) } - if task.LogURL == "" { - task.LogURL = fmt.Sprintf("dbfs:///bots/tasks/%d.log", task.ID) - if err := bots_model.UpdateTask(ctx, task, "log_url"); err != nil { - return nil, status.Errorf(codes.Internal, "update task: %v", err) - } - } ack := task.LogLength if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack { @@ -181,7 +175,7 @@ func (s *Service) UpdateLog( } rows := req.Msg.Rows[ack-req.Msg.Index:] - ns, err := bots.WriteLogs(ctx, task.LogURL, task.LogSize, rows) + ns, err := bots.WriteLogs(ctx, task.LogFilename, task.LogSize, rows) if err != nil { return nil, status.Errorf(codes.Internal, "write logs: %v", err) } diff --git a/routers/web/dev/buildview.go b/routers/web/dev/buildview.go index d1c28b7964..c791426dc5 100644 --- a/routers/web/dev/buildview.go +++ b/routers/web/dev/buildview.go @@ -180,7 +180,7 @@ func BuildViewPost(ctx *context.Context) { index := step.LogIndex + cursor.Cursor length := step.LogLength - cursor.Cursor offset := (*task.LogIndexes)[index] - logRows, err = bots.ReadLogs(ctx, task.LogURL, offset, length) + logRows, err = bots.ReadLogs(ctx, task.LogFilename, offset, length) if err != nil { ctx.Error(http.StatusInternalServerError, err.Error()) return