1
0
mirror of https://github.com/go-gitea/gitea.git synced 2025-04-08 17:05:45 +02:00

database-filesystem, demo log tailing in term

This commit is contained in:
wxiaoguang 2022-08-07 21:26:31 +08:00 committed by Jason Song
parent 931d8c2c21
commit 92d15afd18
8 changed files with 702 additions and 1 deletions

362
models/dbfs/dbfile.go Normal file

@ -0,0 +1,362 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package dbfs
import (
"context"
"errors"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"code.gitea.io/gitea/models/db"
)
var defaultFileBlockSize int64 = 32 * 1024
type File interface {
io.ReadWriteCloser
io.Seeker
}
type file struct {
ctx context.Context
metaID int64
fullPath string
blockSize int64
allowRead bool
allowWrite bool
offset int64
}
var _ File = (*file)(nil)
func (f *file) readAt(fileMeta *FileMeta, offset int64, p []byte) (n int, err error) {
if f.offset >= fileMeta.FileSize {
return 0, io.EOF
}
blobPos := int(offset % f.blockSize)
blobOffset := offset - int64(blobPos)
blobRemaining := int(f.blockSize) - blobPos
needRead := len(p)
if needRead > blobRemaining {
needRead = blobRemaining
}
if blobOffset+int64(blobPos)+int64(needRead) > fileMeta.FileSize {
needRead = int(fileMeta.FileSize - blobOffset - int64(blobPos))
}
if needRead <= 0 {
return 0, io.EOF
}
var fileData FileData
ok, err := db.GetEngine(f.ctx).Where("meta_id = ? AND blob_offset = ?", f.metaID, blobOffset).Get(&fileData)
if err != nil {
return 0, err
}
blobData := fileData.BlobData
if !ok {
blobData = nil
}
canCopy := len(blobData) - blobPos
if canCopy <= 0 {
canCopy = 0
}
realRead := needRead
if realRead > canCopy {
realRead = canCopy
}
if realRead > 0 {
copy(p[:realRead], fileData.BlobData[blobPos:blobPos+realRead])
}
for i := realRead; i < needRead; i++ {
p[i] = 0
}
return needRead, nil
}
func (f *file) Read(p []byte) (n int, err error) {
if f.metaID == 0 || !f.allowRead {
return 0, os.ErrInvalid
}
fileMeta, err := findFileMetaById(f.ctx, f.metaID)
if err != nil {
return 0, err
}
n, err = f.readAt(fileMeta, f.offset, p)
f.offset += int64(n)
return n, err
}
func (f *file) Write(p []byte) (n int, err error) {
if f.metaID == 0 || !f.allowWrite {
return 0, os.ErrInvalid
}
fileMeta, err := findFileMetaById(f.ctx, f.metaID)
if err != nil {
return 0, err
}
needUpdateSize := false
written := 0
for len(p) > 0 {
blobPos := int(f.offset % f.blockSize)
blobOffset := f.offset - int64(blobPos)
blobRemaining := int(f.blockSize) - blobPos
needWrite := len(p)
if needWrite > blobRemaining {
needWrite = blobRemaining
}
buf := make([]byte, f.blockSize)
readBytes, err := f.readAt(fileMeta, blobOffset, buf)
if err != nil && !errors.Is(err, io.EOF) {
return written, err
}
copy(buf[blobPos:blobPos+needWrite], p[:needWrite])
if blobPos+needWrite > readBytes {
buf = buf[:blobPos+needWrite]
} else {
buf = buf[:readBytes]
}
fileData := FileData{
MetaID: fileMeta.ID,
BlobOffset: blobOffset,
BlobData: buf,
}
if res, err := db.GetEngine(f.ctx).Exec("UPDATE file_data SET revision=revision+1, blob_data=? WHERE meta_id=? AND blob_offset=?", buf, fileMeta.ID, blobOffset); err != nil {
return written, err
} else if updated, err := res.RowsAffected(); err != nil {
return written, err
} else if updated == 0 {
if _, err = db.GetEngine(f.ctx).Insert(&fileData); err != nil {
return written, err
}
}
written += needWrite
f.offset += int64(needWrite)
if f.offset > fileMeta.FileSize {
fileMeta.FileSize = f.offset
needUpdateSize = true
}
p = p[needWrite:]
}
fileMetaUpdate := FileMeta{
ModifyTimestamp: timeToFileTimestamp(time.Now()),
}
if needUpdateSize {
fileMetaUpdate.FileSize = f.offset
}
if _, err := db.GetEngine(f.ctx).ID(fileMeta.ID).Update(fileMetaUpdate); err != nil {
return written, err
}
return written, nil
}
func (f *file) Seek(n int64, whence int) (int64, error) {
if f.metaID == 0 {
return 0, os.ErrInvalid
}
newOffset := f.offset
switch whence {
case io.SeekStart:
newOffset = n
case io.SeekCurrent:
newOffset += n
case io.SeekEnd:
size, err := f.size()
if err != nil {
return f.offset, err
}
newOffset = size + n
default:
return f.offset, os.ErrInvalid
}
if newOffset < 0 {
return f.offset, os.ErrInvalid
}
f.offset = newOffset
return newOffset, nil
}
func (f *file) Close() error {
return nil
}
func timeToFileTimestamp(t time.Time) int64 {
return t.UnixMicro()
}
func fileTimestampToTime(t int64) time.Time {
return time.UnixMicro(t)
}
func (f *file) loadMetaByPath() (*FileMeta, error) {
var fileMeta FileMeta
if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil {
return nil, err
} else if ok {
f.metaID = fileMeta.ID
f.blockSize = fileMeta.BlockSize
return &fileMeta, nil
}
return nil, nil
}
func (f *file) open(flag int) (err error) {
// see os.OpenFile for flag values
if flag&os.O_WRONLY != 0 {
f.allowWrite = true
} else if flag&os.O_RDWR != 0 {
f.allowRead = true
f.allowWrite = true
} else /* O_RDONLY */ {
f.allowRead = true
}
if f.allowWrite {
if flag&os.O_CREATE != 0 {
if flag&os.O_EXCL != 0 {
// file must not exist.
if f.metaID != 0 {
return os.ErrExist
}
} else {
// create a new file if none exists.
if f.metaID == 0 {
if err = f.createEmpty(); err != nil {
return err
}
}
}
}
if flag&os.O_TRUNC != 0 {
if err = f.truncate(); err != nil {
return err
}
}
if flag&os.O_APPEND != 0 {
if _, err = f.Seek(0, io.SeekEnd); err != nil {
return err
}
}
return nil
}
// read only mode
if f.metaID == 0 {
return os.ErrNotExist
}
return nil
}
func (f *file) createEmpty() error {
if f.metaID != 0 {
return os.ErrExist
}
now := time.Now()
_, err := db.GetEngine(f.ctx).Insert(&FileMeta{
FullPath: f.fullPath,
BlockSize: f.blockSize,
CreateTimestamp: timeToFileTimestamp(now),
ModifyTimestamp: timeToFileTimestamp(now),
})
if err != nil {
return err
}
if _, err = f.loadMetaByPath(); err != nil {
return err
}
return nil
}
func (f *file) truncate() error {
if f.metaID == 0 {
return os.ErrNotExist
}
return db.WithTx(func(ctx context.Context) error {
if _, err := db.GetEngine(ctx).Exec("UPDATE file_meta SET file_size = 0 WHERE id = ?", f.metaID); err != nil {
return err
}
if _, err := db.GetEngine(ctx).Delete(&FileData{MetaID: f.metaID}); err != nil {
return err
}
return nil
}, f.ctx)
}
func (f *file) renameTo(newPath string) error {
if f.metaID == 0 {
return os.ErrNotExist
}
newPath = buildPath(newPath)
return db.WithTx(func(ctx context.Context) error {
if _, err := db.GetEngine(ctx).Exec("UPDATE file_meta SET full_path = ? WHERE id = ?", newPath, f.metaID); err != nil {
return err
}
return nil
}, f.ctx)
}
func (f *file) delete() error {
if f.metaID == 0 {
return os.ErrNotExist
}
return db.WithTx(func(ctx context.Context) error {
if _, err := db.GetEngine(ctx).Delete(&FileMeta{ID: f.metaID}); err != nil {
return err
}
if _, err := db.GetEngine(ctx).Delete(&FileData{MetaID: f.metaID}); err != nil {
return err
}
return nil
}, f.ctx)
}
func (f *file) size() (int64, error) {
if f.metaID == 0 {
return 0, os.ErrNotExist
}
fileMeta, err := findFileMetaById(f.ctx, f.metaID)
if err != nil {
return 0, err
}
return fileMeta.FileSize, nil
}
func findFileMetaById(ctx context.Context, metaID int64) (*FileMeta, error) {
var fileMeta FileMeta
if ok, err := db.GetEngine(ctx).Where("id = ?", metaID).Get(&fileMeta); err != nil {
return nil, err
} else if ok {
return &fileMeta, nil
}
return nil, nil
}
func buildPath(path string) string {
path = filepath.Clean(path)
path = strings.ReplaceAll(path, "\\", "/")
path = strings.TrimPrefix(path, "/")
return strconv.Itoa(strings.Count(path, "/")) + ":" + path
}
func newDbFile(ctx context.Context, path string) (*file, error) {
path = buildPath(path)
f := &file{ctx: ctx, fullPath: path, blockSize: defaultFileBlockSize}
if _, err := f.loadMetaByPath(); err != nil {
return nil, err
}
return f, nil
}

74
models/dbfs/dbfs.go Normal file

@ -0,0 +1,74 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package dbfs
import (
"context"
"os"
"code.gitea.io/gitea/models/db"
)
type FileMeta struct {
ID int64 `xorm:"pk autoincr"`
FullPath string `xorm:"VARCHAR(500) UNIQUE NOT NULL"`
BlockSize int64 `xorm:"BIGINT NOT NULL"`
FileSize int64 `xorm:"BIGINT NOT NULL"`
CreateTimestamp int64 `xorm:"BIGINT NOT NULL"`
ModifyTimestamp int64 `xorm:"BIGINT NOT NULL"`
}
type FileData struct {
ID int64 `xorm:"pk autoincr"`
Revision int64 `xorm:"BIGINT NOT NULL"`
MetaID int64 `xorm:"BIGINT index(meta_offset) NOT NULL"`
BlobOffset int64 `xorm:"BIGINT index(meta_offset) NOT NULL"`
BlobSize int64 `xorm:"BIGINT NOT NULL"`
BlobData []byte `xorm:"BLOB NOT NULL"`
}
func init() {
db.RegisterModel(new(FileMeta))
db.RegisterModel(new(FileData))
}
func OpenFile(ctx context.Context, name string, flag int) (File, error) {
f, err := newDbFile(ctx, name)
if err != nil {
return nil, err
}
err = f.open(flag)
if err != nil {
_ = f.Close()
return nil, err
}
return f, nil
}
func Open(ctx context.Context, name string) (File, error) {
return OpenFile(ctx, name, os.O_RDONLY)
}
func Create(ctx context.Context, name string) (File, error) {
return OpenFile(ctx, name, os.O_RDWR|os.O_CREATE|os.O_TRUNC)
}
func Rename(ctx context.Context, oldPath, newPath string) error {
f, err := newDbFile(ctx, oldPath)
if err != nil {
return err
}
defer f.Close()
return f.renameTo(newPath)
}
func Remove(ctx context.Context, name string) error {
f, err := newDbFile(ctx, name)
if err != nil {
return err
}
defer f.Close()
return f.delete()
}

154
models/dbfs/dbfs_test.go Normal file

@ -0,0 +1,154 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package dbfs
import (
"bufio"
"io"
"os"
"testing"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
_ "github.com/mattn/go-sqlite3"
)
func changeDefaultFileBlockSize(n int64) (restore func()) {
old := defaultFileBlockSize
defaultFileBlockSize = n
return func() {
defaultFileBlockSize = old
}
}
func TestDbfsBasic(t *testing.T) {
defer changeDefaultFileBlockSize(4)()
assert.NoError(t, unittest.PrepareTestDatabase())
// test basic write/read
f, err := OpenFile(db.DefaultContext, "test.txt", os.O_RDWR|os.O_CREATE)
assert.NoError(t, err)
n, err := f.Write([]byte("0123456789")) // blocks: 0123 4567 89
assert.NoError(t, err)
assert.EqualValues(t, 10, n)
_, err = f.Seek(0, io.SeekStart)
assert.NoError(t, err)
buf, err := io.ReadAll(f)
assert.NoError(t, err)
assert.EqualValues(t, 10, n)
assert.EqualValues(t, "0123456789", string(buf))
// write some new data
_, err = f.Seek(1, io.SeekStart)
assert.NoError(t, err)
_, err = f.Write([]byte("bcdefghi")) // blocks: 0bcd efgh i9
assert.NoError(t, err)
// read from offset
buf, err = io.ReadAll(f)
assert.NoError(t, err)
assert.EqualValues(t, "9", string(buf))
// read all
_, err = f.Seek(0, io.SeekStart)
assert.NoError(t, err)
buf, err = io.ReadAll(f)
assert.NoError(t, err)
assert.EqualValues(t, "0bcdefghi9", string(buf))
// write to new size
_, err = f.Seek(-1, io.SeekEnd)
assert.NoError(t, err)
_, err = f.Write([]byte("JKLMNOP")) // blocks: 0bcd efgh iJKL MNOP
assert.NoError(t, err)
_, err = f.Seek(0, io.SeekStart)
assert.NoError(t, err)
buf, err = io.ReadAll(f)
assert.NoError(t, err)
assert.EqualValues(t, "0bcdefghiJKLMNOP", string(buf))
// write beyond EOF and fill with zero
_, err = f.Seek(5, io.SeekCurrent)
assert.NoError(t, err)
_, err = f.Write([]byte("xyzu")) // blocks: 0bcd efgh iJKL MNOP 0000 0xyz u
assert.NoError(t, err)
_, err = f.Seek(0, io.SeekStart)
assert.NoError(t, err)
buf, err = io.ReadAll(f)
assert.NoError(t, err)
assert.EqualValues(t, "0bcdefghiJKLMNOP\x00\x00\x00\x00\x00xyzu", string(buf))
// write to the block with zeros
_, err = f.Seek(-6, io.SeekCurrent)
assert.NoError(t, err)
_, err = f.Write([]byte("ABCD")) // blocks: 0bcd efgh iJKL MNOP 000A BCDz u
assert.NoError(t, err)
_, err = f.Seek(0, io.SeekStart)
assert.NoError(t, err)
buf, err = io.ReadAll(f)
assert.NoError(t, err)
assert.EqualValues(t, "0bcdefghiJKLMNOP\x00\x00\x00ABCDzu", string(buf))
assert.NoError(t, f.Close())
// test rename
err = Rename(db.DefaultContext, "test.txt", "test2.txt")
assert.NoError(t, err)
_, err = OpenFile(db.DefaultContext, "test.txt", os.O_RDONLY)
assert.Error(t, err)
f, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY)
assert.NoError(t, err)
assert.NoError(t, f.Close())
// test remove
err = Remove(db.DefaultContext, "test2.txt")
assert.NoError(t, err)
_, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY)
assert.Error(t, err)
}
func TestDbfsReadWrite(t *testing.T) {
defer changeDefaultFileBlockSize(4)()
assert.NoError(t, unittest.PrepareTestDatabase())
f1, err := OpenFile(db.DefaultContext, "test.log", os.O_RDWR|os.O_CREATE)
assert.NoError(t, err)
defer f1.Close()
f2, err := OpenFile(db.DefaultContext, "test.log", os.O_RDONLY)
assert.NoError(t, err)
defer f2.Close()
_, err = f1.Write([]byte("line 1\n"))
assert.NoError(t, err)
f2r := bufio.NewReader(f2)
line, err := f2r.ReadString('\n')
assert.NoError(t, err)
assert.EqualValues(t, "line 1\n", line)
_, err = f2r.ReadString('\n')
assert.ErrorIs(t, err, io.EOF)
_, err = f1.Write([]byte("line 2\n"))
assert.NoError(t, err)
line, err = f2r.ReadString('\n')
assert.NoError(t, err)
assert.EqualValues(t, "line 2\n", line)
_, err = f2r.ReadString('\n')
assert.ErrorIs(t, err, io.EOF)
}

28
models/dbfs/main_test.go Normal file

@ -0,0 +1,28 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package dbfs
import (
"path/filepath"
"testing"
"code.gitea.io/gitea/models/unittest"
"code.gitea.io/gitea/modules/setting"
_ "code.gitea.io/gitea/models"
_ "code.gitea.io/gitea/models/repo"
_ "code.gitea.io/gitea/models/user"
)
func init() {
setting.SetCustomPathAndConf("", "", "")
setting.LoadForTest()
}
func TestMain(m *testing.M) {
unittest.MainTest(m, &unittest.TestOptions{
GiteaRootPath: filepath.Join("..", ".."),
})
}

@ -25,7 +25,7 @@ func DetectWorkflows(commit *git.Commit, event webhook.HookEventType) (git.Entri
return nil, nil, err
}
entries, err := tree.ListEntriesRecursive()
entries, err := tree.ListEntriesRecursiveFast()
if err != nil {
return nil, nil, err
}

@ -0,0 +1,51 @@
package dev
import (
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/dbfs"
"code.gitea.io/gitea/modules/context"
)
var demoLogWriterOnce sync.Once
func TermDemo(ctx *context.Context) {
demoLogWriterOnce.Do(func() {
go func() {
f, _ := dbfs.OpenFile(db.DefaultContext, "termdemo.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND)
count := 0
for {
count++
s := fmt.Sprintf("\x1B[1;3;31mDemo Log\x1B[0m, count=%d\r\n", count)
_, _ = f.Write([]byte(s))
time.Sleep(time.Second)
}
}()
})
cmd := ctx.FormString("cmd")
if cmd == "tail" {
offset := ctx.FormInt64("offset")
f, _ := dbfs.OpenFile(db.DefaultContext, "termdemo.log", os.O_RDONLY)
if offset == -1 {
_, _ = f.Seek(0, io.SeekEnd)
} else {
_, _ = f.Seek(offset, io.SeekStart)
}
buf, _ := io.ReadAll(f)
offset, _ = f.Seek(0, io.SeekCurrent)
ctx.JSON(http.StatusOK, map[string]interface{}{
"offset": offset,
"content": string(buf),
})
return
}
ctx.HTML(http.StatusOK, "dev/termdemo")
}

@ -28,6 +28,7 @@ import (
"code.gitea.io/gitea/modules/web/routing"
"code.gitea.io/gitea/routers/web/admin"
"code.gitea.io/gitea/routers/web/auth"
"code.gitea.io/gitea/routers/web/dev"
"code.gitea.io/gitea/routers/web/events"
"code.gitea.io/gitea/routers/web/explore"
"code.gitea.io/gitea/routers/web/feed"
@ -659,6 +660,10 @@ func RegisterRoutes(m *web.Route) {
m.Post("/{username}", reqSignIn, context_service.UserAssignmentWeb(), user.Action)
if !setting.IsProd {
m.Any("/dev/termdemo", dev.TermDemo)
}
reqRepoAdmin := context.RequireRepoAdmin()
reqRepoCodeWriter := context.RequireRepoWriter(unit.TypeCode)
canEnableEditor := context.CanEnableEditor()

@ -0,0 +1,27 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/xterm@4.19.0/css/xterm.css" />
<script src="https://cdn.jsdelivr.net/npm/xterm@4.19.0/lib/xterm.js"></script>
</head>
<body>
<div id="terminal"></div>
<script type="module">
const term = new Terminal();
term.open(document.getElementById('terminal'));
term.write('Hello from \x1B[1;3;31mxterm.js\x1B[0m!\r\n');
let offset = -1;
setInterval(async ()=>{
const res = await fetch('?cmd=tail&offset='+offset);
const data = await res.json();
console.log("tail resp data", data);
offset = data.offset;
term.write(data.content);
}, 500);
</script>
</body>
</html>