diff --git a/go.mod b/go.mod index 5e6529e0e4..2e6d50f7d2 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.18 require ( code.gitea.io/gitea-vet v0.2.2-0.20220122151748-48ebc902541b code.gitea.io/sdk/gitea v0.15.1 + gitea.com/gitea/proto v0.0.0-20220802024851-7ee5947f928a + gitea.com/go-chi/binding v0.0.0-20220309004920-114340dabecb codeberg.org/gusted/mcaptcha v0.0.0-20220723083913-4f3072e1d570 gitea.com/go-chi/binding v0.0.0-20221013104517-b29891619681 gitea.com/go-chi/cache v0.2.0 @@ -54,6 +56,7 @@ require ( github.com/google/uuid v1.3.0 github.com/gorilla/feeds v1.1.1 github.com/gorilla/sessions v1.2.1 + github.com/gorilla/websocket v1.4.2 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 github.com/huandu/xstrings v1.3.2 @@ -71,6 +74,7 @@ require ( github.com/microcosm-cc/bluemonday v1.0.20 github.com/minio/minio-go/v7 v7.0.39 github.com/msteinert/pam v1.1.0 + github.com/nektos/act v0.2.24 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/niklasfasching/go-org v1.6.5 github.com/oliamb/cutter v0.2.2 @@ -198,7 +202,6 @@ require ( github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/securecookie v1.1.1 // indirect - github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect @@ -233,7 +236,6 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mrjones/oauth v0.0.0-20190623134757-126b35219450 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nektos/act v0.2.26 // indirect github.com/nwaples/rardecode v1.1.3 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect diff --git a/modules/graceful/server_http.go b/modules/graceful/server_http.go index 8ab2bdf41f..c1b9cf0b63 100644 --- a/modules/graceful/server_http.go +++ b/modules/graceful/server_http.go @@ -9,6 +9,9 @@ import ( "crypto/tls" "net" "net/http" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" ) func newHTTPServer(network, address, name string, handler http.Handler) (*Server, ServeFunction) { @@ -17,7 +20,7 @@ func newHTTPServer(network, address, name string, handler http.Handler) (*Server ReadTimeout: DefaultReadTimeOut, WriteTimeout: DefaultWriteTimeOut, MaxHeaderBytes: DefaultMaxHeaderBytes, - Handler: handler, + Handler: h2c.NewHandler(handler, &http2.Server{}), BaseContext: func(net.Listener) context.Context { return GetManager().HammerContext() }, } server.OnShutdown = func() { diff --git a/routers/api/bots/bots.go b/routers/api/bots/bots.go index 89d445c6f1..205749a4a2 100644 --- a/routers/api/bots/bots.go +++ b/routers/api/bots/bots.go @@ -5,207 +5,48 @@ package bots import ( - "encoding/json" - "errors" - "fmt" - "net/http" - "strings" - "time" - - bots_model "code.gitea.io/gitea/models/bots" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/web" + "gitea.com/gitea/proto/gen/proto/v1/v1connect" - "github.com/gorilla/websocket" + "github.com/bufbuild/connect-go" + grpchealth "github.com/bufbuild/connect-grpchealth-go" + grpcreflect "github.com/bufbuild/connect-grpcreflect-go" ) -func Routes() *web.Route { - r := web.NewRoute() - r.Get("/", Serve) - return r -} - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 4096, - WriteBufferSize: 4096, - EnableCompression: true, - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - -var pongWait = 60 * time.Second - -type Message struct { - Version int // - Type int // message type, 1 register 2 error 3 task 4 no task - RunnerUUID string // runner uuid - BuildUUID string // build uuid - ErrCode int // error code - ErrContent string // errors message - EventName string - EventPayload string - JobID string // only run the special job, empty means run all the jobs -} - -const ( - version1 = 1 -) - -const ( - MsgTypeRegister = iota + 1 // register - MsgTypeError // error - MsgTypeRequestBuild // request build task - MsgTypeIdle // no task - MsgTypeBuildResult // build result - MsgTypeBuildJobResult // build job result -) - -func handleVersion1(r *http.Request, c *websocket.Conn, mt int, message []byte, msg *Message) error { - switch msg.Type { - case MsgTypeRegister: - log.Info("websocket[%s] registered", r.RemoteAddr) - runner, err := bots_model.GetRunnerByUUID(msg.RunnerUUID) - if err != nil { - if !errors.Is(err, bots_model.ErrRunnerNotExist{}) { - return fmt.Errorf("websocket[%s] get runner [%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err) - } - err = c.WriteMessage(mt, message) - if err != nil { - return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err) - } - } else { - fmt.Printf("-----%v\n", runner) - // TODO: handle read message - err = c.WriteMessage(mt, message) - if err != nil { - return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err) - } - } - case MsgTypeRequestBuild: - // TODO: find new task and send to client - build, err := bots_model.GetCurBuildByUUID(msg.RunnerUUID) - if err != nil { - return fmt.Errorf("websocket[%s] get task[%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err) - } - var returnMsg *Message - if build == nil { - time.Sleep(3 * time.Second) - returnMsg = &Message{ - Version: version1, - Type: MsgTypeIdle, - RunnerUUID: msg.RunnerUUID, - } - } else { - returnMsg = &Message{ - Version: version1, - Type: MsgTypeRequestBuild, - RunnerUUID: msg.RunnerUUID, - BuildUUID: build.UUID, - EventName: build.Event.Event(), - EventPayload: build.EventPayload, - } - } - bs, err := json.Marshal(&returnMsg) - if err != nil { - return fmt.Errorf("websocket[%s] marshal message failed: %v", r.RemoteAddr, err) - } - err = c.WriteMessage(mt, bs) - if err != nil { - return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err) - } - case MsgTypeBuildResult: - log.Info("websocket[%s] returned CI result: %v", r.RemoteAddr, msg) - build, err := bots_model.GetBuildByUUID(msg.BuildUUID) - if err != nil { - return fmt.Errorf("websocket[%s] get build by uuid failed: %v", r.RemoteAddr, err) - } - cols := []string{"status", "end_time"} - if msg.ErrCode == 0 { - build.Status = bots_model.BuildFinished - } else { - build.Status = bots_model.BuildFailed - } - build.EndTime = timeutil.TimeStampNow() - if err := bots_model.UpdateBuild(build, cols...); err != nil { - log.Error("websocket[%s] update build failed: %v", r.RemoteAddr, err) - } - default: - returnMsg := Message{ - Version: version1, - Type: MsgTypeError, - ErrCode: 1, - ErrContent: fmt.Sprintf("message type %d is not supported", msg.Type), - } - bs, err := json.Marshal(&returnMsg) - if err != nil { - return fmt.Errorf("websocket[%s] marshal message failed: %v", r.RemoteAddr, err) - } - err = c.WriteMessage(mt, bs) - if err != nil { - return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err) - } - } - return nil -} - -func Serve(w http.ResponseWriter, r *http.Request) { - log.Trace("websocket init request begin from %s", r.RemoteAddr) - c, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Error("websocket upgrade failed: %v", err) - return - } - defer c.Close() - log.Trace("websocket upgrade from %s successfully", r.RemoteAddr) - - c.SetReadDeadline(time.Now().Add(pongWait)) - c.SetPongHandler(func(string) error { c.SetReadDeadline(time.Now().Add(pongWait)); return nil }) - - for { - // read message from client - mt, message, err := c.ReadMessage() - if err != nil { - if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || - websocket.IsCloseError(err, websocket.CloseNormalClosure) { - c.Close() - } else if !strings.Contains(err.Error(), "i/o timeout") { - log.Error("websocket[%s] read failed: %#v", r.RemoteAddr, err) - } - break - } - - log.Trace("websocket[%s] received message: %s", r.RemoteAddr, string(message)) - - // read message first - var msg Message - if err = json.Unmarshal(message, &msg); err != nil { - log.Error("websocket[%s] unmarshal failed: %#v", r.RemoteAddr, err) - break - } - - switch msg.Version { - case 1: - if err := handleVersion1(r, c, mt, message, &msg); err != nil { - log.Error("%v", err) - } - default: - returnMsg := Message{ - Version: 1, - Type: MsgTypeError, - ErrCode: 1, - ErrContent: "version is not supported", - } - bs, err := json.Marshal(&returnMsg) - if err != nil { - log.Error("websocket[%s] marshal message failed: %v", r.RemoteAddr, err) - } else { - err = c.WriteMessage(mt, bs) - if err != nil { - log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err) - } - } - } - } +func Routes(r *web.Route) { + compress1KB := connect.WithCompressMinBytes(1024) + + service := &RunnerService{} + path, handler := v1connect.NewBuildServiceHandler( + service, + compress1KB, + ) + + // grpcV1 + grpcPath, gHandler := grpcreflect.NewHandlerV1( + grpcreflect.NewStaticReflector(v1connect.BuildServiceName), + compress1KB, + ) + + // grpcV1Alpha + grpcAlphaPath, gAlphaHandler := grpcreflect.NewHandlerV1Alpha( + grpcreflect.NewStaticReflector(v1connect.BuildServiceName), + compress1KB, + ) + + // grpcHealthCheck + grpcHealthPath, gHealthHandler := grpchealth.NewHandler( + grpchealth.NewStaticChecker(v1connect.BuildServiceName), + compress1KB, + ) + + // socket connection + r.Get("/socket", socketServe) + // restful connection + r.Post(path+"{name}", giteaHandler(handler)) + // grpc connection + r.Post(grpcPath+"{name}", giteaHandler(gHandler)) + r.Post(grpcAlphaPath+"{name}", giteaHandler(gAlphaHandler)) + // healthy check connection + r.Post(grpcHealthPath+"{name}", giteaHandler(gHealthHandler)) } diff --git a/routers/api/bots/process.go b/routers/api/bots/process.go new file mode 100644 index 0000000000..6abb1f4399 --- /dev/null +++ b/routers/api/bots/process.go @@ -0,0 +1,48 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "context" + "net/http" + + "code.gitea.io/gitea/modules/log" + v1 "gitea.com/gitea/proto/gen/proto/v1" + + "github.com/bufbuild/connect-go" +) + +type RunnerService struct{} + +func (s *RunnerService) Connect( + ctx context.Context, + req *connect.Request[v1.ConnectRequest], +) (*connect.Response[v1.ConnectResponse], error) { + log.Info("Request headers: %v", req.Header()) + res := connect.NewResponse(&v1.ConnectResponse{ + JobId: 100, + }) + res.Header().Set("Gitea-Version", "v1") + return res, nil +} + +func (s *RunnerService) Accept( + ctx context.Context, + req *connect.Request[v1.AcceptRequest], +) (*connect.Response[v1.AcceptResponse], error) { + log.Info("Request headers: %v", req.Header()) + res := connect.NewResponse(&v1.AcceptResponse{ + JobId: 100, + }) + res.Header().Set("Gitea-Version", "v1") + return res, nil +} + +func giteaHandler(h http.Handler) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log.Info("Got connection: %v", r.Proto) + h.ServeHTTP(w, r) + }) +} diff --git a/routers/api/bots/socket.go b/routers/api/bots/socket.go new file mode 100644 index 0000000000..7d508192d2 --- /dev/null +++ b/routers/api/bots/socket.go @@ -0,0 +1,206 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package bots + +import ( + "errors" + "fmt" + "net/http" + "strings" + "time" + + bots_model "code.gitea.io/gitea/models/bots" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/timeutil" + + "github.com/gorilla/websocket" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 4096, + WriteBufferSize: 4096, + EnableCompression: true, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +var pongWait = 60 * time.Second + +type Message struct { + Version int // + Type int // message type, 1 register 2 error 3 task 4 no task + RunnerUUID string // runner uuid + BuildUUID string // build uuid + ErrCode int // error code + ErrContent string // errors message + EventName string + EventPayload string + JobID string // only run the special job, empty means run all the jobs +} + +const ( + version1 = 1 +) + +const ( + MsgTypeRegister = iota + 1 // register + MsgTypeError // error + MsgTypeRequestBuild // request build task + MsgTypeIdle // no task + MsgTypeBuildResult // build result + MsgTypeBuildJobResult // build job result +) + +func handleVersion1(r *http.Request, c *websocket.Conn, mt int, message []byte, msg *Message) error { + switch msg.Type { + case MsgTypeRegister: + log.Info("websocket[%s] registered", r.RemoteAddr) + runner, err := bots_model.GetRunnerByUUID(msg.RunnerUUID) + if err != nil { + if !errors.Is(err, bots_model.ErrRunnerNotExist{}) { + return fmt.Errorf("websocket[%s] get runner [%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err) + } + err = c.WriteMessage(mt, message) + if err != nil { + return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + } + } else { + fmt.Printf("-----%v\n", runner) + // TODO: handle read message + err = c.WriteMessage(mt, message) + if err != nil { + return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + } + } + case MsgTypeRequestBuild: + // TODO: find new task and send to client + build, err := bots_model.GetCurBuildByUUID(msg.RunnerUUID) + if err != nil { + return fmt.Errorf("websocket[%s] get task[%s] failed: %v", r.RemoteAddr, msg.RunnerUUID, err) + } + var returnMsg *Message + if build == nil { + time.Sleep(3 * time.Second) + returnMsg = &Message{ + Version: version1, + Type: MsgTypeIdle, + RunnerUUID: msg.RunnerUUID, + } + } else { + returnMsg = &Message{ + Version: version1, + Type: MsgTypeRequestBuild, + RunnerUUID: msg.RunnerUUID, + BuildUUID: build.UUID, + EventName: build.Event.Event(), + EventPayload: build.EventPayload, + } + } + bs, err := json.Marshal(&returnMsg) + if err != nil { + return fmt.Errorf("websocket[%s] marshal message failed: %v", r.RemoteAddr, err) + } + err = c.WriteMessage(mt, bs) + if err != nil { + return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + } + case MsgTypeBuildResult: + log.Info("websocket[%s] returned CI result: %v", r.RemoteAddr, msg) + build, err := bots_model.GetBuildByUUID(msg.BuildUUID) + if err != nil { + return fmt.Errorf("websocket[%s] get build by uuid failed: %v", r.RemoteAddr, err) + } + cols := []string{"status", "end_time"} + if msg.ErrCode == 0 { + build.Status = bots_model.BuildFinished + } else { + build.Status = bots_model.BuildFailed + } + build.EndTime = timeutil.TimeStampNow() + if err := bots_model.UpdateBuild(build, cols...); err != nil { + log.Error("websocket[%s] update build failed: %v", r.RemoteAddr, err) + } + default: + returnMsg := Message{ + Version: version1, + Type: MsgTypeError, + ErrCode: 1, + ErrContent: fmt.Sprintf("message type %d is not supported", msg.Type), + } + bs, err := json.Marshal(&returnMsg) + if err != nil { + return fmt.Errorf("websocket[%s] marshal message failed: %v", r.RemoteAddr, err) + } + err = c.WriteMessage(mt, bs) + if err != nil { + return fmt.Errorf("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + } + } + return nil +} + +func socketServe(w http.ResponseWriter, r *http.Request) { + log.Trace("websocket init request begin from %s", r.RemoteAddr) + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Error("websocket upgrade failed: %v", err) + return + } + defer c.Close() + log.Trace("websocket upgrade from %s successfully", r.RemoteAddr) + + _ = c.SetReadDeadline(time.Now().Add(pongWait)) + c.SetPongHandler(func(string) error { + return c.SetReadDeadline(time.Now().Add(pongWait)) + }) + + for { + // read message from client + mt, message, err := c.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || + websocket.IsCloseError(err, websocket.CloseNormalClosure) { + c.Close() + } else if !strings.Contains(err.Error(), "i/o timeout") { + log.Error("websocket[%s] read failed: %#v", r.RemoteAddr, err) + } + break + } + + log.Trace("websocket[%s] received message: %s", r.RemoteAddr, string(message)) + + // read message first + var msg Message + if err = json.Unmarshal(message, &msg); err != nil { + log.Error("websocket[%s] unmarshal failed: %#v", r.RemoteAddr, err) + break + } + + switch msg.Version { + case 1: + if err := handleVersion1(r, c, mt, message, &msg); err != nil { + log.Error("%v", err) + } + default: + returnMsg := Message{ + Version: 1, + Type: MsgTypeError, + ErrCode: 1, + ErrContent: "version is not supported", + } + bs, err := json.Marshal(&returnMsg) + if err != nil { + log.Error("websocket[%s] marshal message failed: %v", r.RemoteAddr, err) + } else { + err = c.WriteMessage(mt, bs) + if err != nil { + log.Error("websocket[%s] sent message failed: %v", r.RemoteAddr, err) + } + } + } + } +} diff --git a/routers/init.go b/routers/init.go index 80c44bf61b..aef71792f5 100644 --- a/routers/init.go +++ b/routers/init.go @@ -198,6 +198,6 @@ func NormalRoutes(ctx context.Context) *web.Route { // This implements the OCI API (Note this is not preceded by /api but is instead /v2) r.Mount("/v2", packages_router.ContainerRoutes(ctx)) } - r.Mount("/api/actions", bots_router.Routes()) + bots_router.Routes(r) return r }