fix: use file name for log

This commit is contained in:
Jason Song 2022-10-19 14:54:11 +08:00
parent e6ad1b3233
commit d2f8d5ded1
4 changed files with 27 additions and 55 deletions

View File

@ -33,11 +33,12 @@ type Task struct {
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
LogURL string // dbfs:///a/b.log or s3://endpoint.com/a/b.log and etc.
LogLength int64 // lines count
LogSize int64 // blob size
LogIndexes *LogIndexes `xorm:"BLOB"` // line number to offset
LogExpired bool
LogFilename string // file name of log
LogInStorage bool // read log from database or from storage
LogLength int64 // lines count
LogSize int64 // blob size
LogIndexes *LogIndexes `xorm:"BLOB"` // line number to offset
LogExpired bool // files that are too old will be deleted
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
@ -167,8 +168,10 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) {
}
defer commiter.Close()
e := db.GetEngine(ctx)
var jobs []*RunJob
if err := db.GetEngine(ctx).Where("task_id=? AND ready=?", 0, true).OrderBy("id").Find(&jobs); err != nil {
if err := e.Where("task_id=? AND ready=?", 0, true).OrderBy("id").Find(&jobs); err != nil {
return nil, false, err
}
@ -185,6 +188,9 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) {
if job == nil {
return nil, false, nil
}
if err := job.LoadAttributes(ctx); err != nil {
return nil, false, err
}
now := timeutil.TimeStampNow()
job.Attempt++
@ -207,7 +213,12 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) {
_, wolkflowJob = gots[0].Job()
}
if err := db.Insert(ctx, task); err != nil {
if _, err := e.Insert(ctx, task); err != nil {
return nil, false, err
}
task.LogFilename = fmt.Sprintf("%s/%d.log", job.Run.Repo.FullName(), task.ID)
if _, err := e.ID(task.ID).Cols("log_filename").Update(task); err != nil {
return nil, false, err
}
@ -219,20 +230,17 @@ func CreateTaskForRunner(runner *Runner) (*Task, bool, error) {
Number: int64(i),
}
}
if err := db.Insert(ctx, steps); err != nil {
if _, err := e.Insert(ctx, steps); err != nil {
return nil, false, err
}
task.Steps = steps
job.TaskID = task.ID
if _, err := db.GetEngine(ctx).ID(job.ID).Update(job); err != nil {
if _, err := e.ID(job.ID).Update(job); err != nil {
return nil, false, err
}
task.Job = job
if err := task.Job.LoadAttributes(ctx); err != nil {
return nil, false, err
}
if err := commiter.Commit(); err != nil {
return nil, false, err

View File

@ -9,7 +9,6 @@ import (
"context"
"fmt"
"io"
"net/url"
"os"
"strings"
"time"
@ -22,23 +21,14 @@ import (
const (
MaxLineSize = 64 * 1024
DBFSPrefix = "bots_tasks/"
timeFormat = time.RFC3339Nano
defaultBufSize = 64 * 1024
)
const (
StorageSchemaDBFS = "dbfs"
StorageSchemaFile = "file"
StorageSchemaS3 = "s3"
// ...
)
func WriteLogs(ctx context.Context, rawURL string, offset int64, rows []*runnerv1.LogRow) ([]int, error) {
name, err := parseDBFSName(rawURL)
if err != nil {
return nil, err
}
func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) {
name := DBFSPrefix + filename
f, err := dbfs.OpenFile(ctx, name, os.O_WRONLY|os.O_CREATE)
if err != nil {
return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err)
@ -65,12 +55,8 @@ func WriteLogs(ctx context.Context, rawURL string, offset int64, rows []*runnerv
return ns, nil
}
func ReadLogs(ctx context.Context, rawURL string, offset int64, limit int64) ([]*runnerv1.LogRow, error) {
name, err := parseDBFSName(rawURL)
if err != nil {
return nil, err
}
func ReadLogs(ctx context.Context, filename string, offset int64, limit int64) ([]*runnerv1.LogRow, error) {
name := DBFSPrefix + filename
f, err := dbfs.Open(ctx, name)
if err != nil {
return nil, fmt.Errorf("dbfs Open %q: %w", name, err)
@ -103,22 +89,6 @@ func ReadLogs(ctx context.Context, rawURL string, offset int64, limit int64) ([]
return rows, nil
}
func parseDBFSName(rawURL string) (string, error) {
u, err := url.Parse(rawURL)
if err != nil {
return "", fmt.Errorf("invalid url: %w", err)
}
if u.Scheme != StorageSchemaDBFS {
return "", fmt.Errorf("%s supported only yet", StorageSchemaDBFS)
}
if u.Path == "" {
return "", fmt.Errorf("empty path")
}
return u.Path, nil
}
func FormatLog(timestamp time.Time, content string) string {
// Content shouldn't contain new line, it will break log indexes, other control chars are safe.
content = strings.ReplaceAll(content, "\n", `\n`)

View File

@ -167,12 +167,6 @@ func (s *Service) UpdateLog(
if err != nil {
return nil, status.Errorf(codes.Internal, "get task: %v", err)
}
if task.LogURL == "" {
task.LogURL = fmt.Sprintf("dbfs:///bots/tasks/%d.log", task.ID)
if err := bots_model.UpdateTask(ctx, task, "log_url"); err != nil {
return nil, status.Errorf(codes.Internal, "update task: %v", err)
}
}
ack := task.LogLength
if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack {
@ -181,7 +175,7 @@ func (s *Service) UpdateLog(
}
rows := req.Msg.Rows[ack-req.Msg.Index:]
ns, err := bots.WriteLogs(ctx, task.LogURL, task.LogSize, rows)
ns, err := bots.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
if err != nil {
return nil, status.Errorf(codes.Internal, "write logs: %v", err)
}

View File

@ -180,7 +180,7 @@ func BuildViewPost(ctx *context.Context) {
index := step.LogIndex + cursor.Cursor
length := step.LogLength - cursor.Cursor
offset := (*task.LogIndexes)[index]
logRows, err = bots.ReadLogs(ctx, task.LogURL, offset, length)
logRows, err = bots.ReadLogs(ctx, task.LogFilename, offset, length)
if err != nil {
ctx.Error(http.StatusInternalServerError, err.Error())
return