feat: receive task logs

This commit is contained in:
Jason Song 2022-10-12 17:58:36 +08:00
parent 7497155d14
commit 848a1ae309
2 changed files with 67 additions and 13 deletions

View File

@ -8,14 +8,15 @@ import (
"fmt"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
)
// TaskLog represents a task's log, every task has a standalone table
type TaskLog struct {
ID int64
Content string `xorm:"BINARY"`
Created timeutil.TimeStamp `xorm:"created"`
ID int64
Timestamp timeutil.TimeStamp
Content string `xorm:"LONGTEXT"`
}
func init() {
@ -26,16 +27,16 @@ func GetTaskLogTableName(taskID int64) string {
return fmt.Sprintf("bots_task_log_%d", taskID)
}
// CreateTaskLog table for a build
func CreateTaskLog(buildID int64) error {
// CreateTaskLog table for a task
func CreateTaskLog(taskID int64) error {
return db.GetEngine(db.DefaultContext).
Table(GetBuildLogTableName(buildID)).
Sync2(new(BuildLog))
Table(GetTaskLogTableName(taskID)).
Sync(new(TaskLog))
}
func GetTaskLogs(taskID, index, length int64) (logs []*TaskLog, err error) {
sess := db.GetEngine(db.DefaultContext).Table(GetBuildLogTableName(taskID)).
Where("id>=?", index)
Where("id>=?", index).OrderBy("id")
if length > 0 {
sess.Limit(int(length))
@ -45,3 +46,36 @@ func GetTaskLogs(taskID, index, length int64) (logs []*TaskLog, err error) {
return
}
func InsertTaskLogs(taskID int64, logs []*TaskLog) (int64, error) {
if err := CreateTaskLog(taskID); err != nil {
return 0, err
}
table := GetTaskLogTableName(taskID)
// TODO: A more complete way to insert logs
// Be careful:
// - the id of a log can be 0
// - some logs may already exist in db
// - if use exec, consider different databases
// - the input should be ordered by id
// - the ids should be continuously increasing
// - the min id of input should be 1 + (the max id in db)
if len(logs) == 0 {
return 0, fmt.Errorf("no logs")
}
ack := logs[0].ID - 1
sess := db.GetEngine(db.DefaultContext)
for _, v := range logs {
_, err := sess.Exec(fmt.Sprintf("INSERT IGNORE INTO %s (id, timestamp, content) VALUES (?,?,?)", table), v.ID, v.Timestamp, []byte(v.Content))
if err != nil {
log.Error("insert log %d of task %d: %v", v.ID, taskID, err)
break
}
ack = v.ID
}
return ack, nil
}

View File

@ -17,7 +17,7 @@ import (
"code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
"gitea.com/gitea/proto-go/runner/v1/runnerv1connect"
@ -180,11 +180,31 @@ func (s *Service) UpdateLog(
) (*connect.Response[runnerv1.UpdateLogResponse], error) {
res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
// to debug
for i, row := range req.Msg.Rows {
log.Info("log[%v]: %v %v", req.Msg.Index+int64(i), row.Time.AsTime().Local().Format(time.RFC3339), row.Content)
if len(req.Msg.Rows) == 0 {
// TODO: should be 1 + the max id of stored log
res.Msg.AckIndex = req.Msg.Index
return res, nil
}
rowIndex := req.Msg.Index
rows := make([]*bots_model.TaskLog, len(req.Msg.Rows))
for i, v := range req.Msg.Rows {
rows[i] = &bots_model.TaskLog{
ID: rowIndex + int64(i),
Timestamp: timeutil.TimeStamp(v.Time.AsTime().Unix()),
Content: v.Content,
}
}
ack, err := bots_model.InsertTaskLogs(req.Msg.TaskId, rows)
if err != nil {
return nil, status.Errorf(codes.Internal, "insert task log: %v", err)
}
res.Msg.AckIndex = ack
if req.Msg.NoMore {
// TODO: transfer logs to storage from db
}
res.Msg.AckIndex = req.Msg.Index + int64(len(req.Msg.Rows))
return res, nil
}