mirror of https://github.com/docker/compose.git
Merge pull request #10791 from milas/watch-refactor-sync
watch: move sync logic into separate package
This commit is contained in:
commit
8318f66330
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
Copyright 2023 Docker Compose CLI authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
|
||||
"github.com/compose-spec/compose-go/types"
|
||||
"github.com/docker/compose/v2/pkg/api"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ComposeClient interface {
|
||||
Exec(ctx context.Context, projectName string, options api.RunOptions) (int, error)
|
||||
|
||||
Copy(ctx context.Context, projectName string, options api.CopyOptions) error
|
||||
}
|
||||
|
||||
type DockerCopy struct {
|
||||
client ComposeClient
|
||||
|
||||
projectName string
|
||||
|
||||
infoWriter io.Writer
|
||||
}
|
||||
|
||||
var _ Syncer = &DockerCopy{}
|
||||
|
||||
func NewDockerCopy(projectName string, client ComposeClient, infoWriter io.Writer) *DockerCopy {
|
||||
return &DockerCopy{
|
||||
projectName: projectName,
|
||||
client: client,
|
||||
infoWriter: infoWriter,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DockerCopy) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
|
||||
var errs []error
|
||||
for i := range paths {
|
||||
if err := d.sync(ctx, service, paths[i]); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, pathMapping PathMapping) error {
|
||||
scale := 1
|
||||
if service.Deploy != nil && service.Deploy.Replicas != nil {
|
||||
scale = int(*service.Deploy.Replicas)
|
||||
}
|
||||
|
||||
if fi, statErr := os.Stat(pathMapping.HostPath); statErr == nil {
|
||||
if fi.IsDir() {
|
||||
for i := 1; i <= scale; i++ {
|
||||
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
|
||||
Service: pathMapping.Service,
|
||||
Command: []string{"mkdir", "-p", pathMapping.ContainerPath},
|
||||
Index: i,
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath)
|
||||
} else {
|
||||
err := d.client.Copy(ctx, d.projectName, api.CopyOptions{
|
||||
Source: pathMapping.HostPath,
|
||||
Destination: fmt.Sprintf("%s:%s", pathMapping.Service, pathMapping.ContainerPath),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Fprintf(d.infoWriter, "%s updated\n", pathMapping.ContainerPath)
|
||||
}
|
||||
} else if errors.Is(statErr, fs.ErrNotExist) {
|
||||
for i := 1; i <= scale; i++ {
|
||||
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
|
||||
Service: pathMapping.Service,
|
||||
Command: []string{"rm", "-rf", pathMapping.ContainerPath},
|
||||
Index: i,
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
Copyright 2023 Docker Compose CLI authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/compose-spec/compose-go/types"
|
||||
)
|
||||
|
||||
// PathMapping contains the Compose service and modified host system path.
|
||||
type PathMapping struct {
|
||||
// Service that the file event is for.
|
||||
Service string
|
||||
// HostPath that was created/modified/deleted outside the container.
|
||||
//
|
||||
// This is the path as seen from the user's perspective, e.g.
|
||||
// - C:\Users\moby\Documents\hello-world\main.go (file on Windows)
|
||||
// - /Users/moby/Documents/hello-world (directory on macOS)
|
||||
HostPath string
|
||||
// ContainerPath for the target file inside the container (only populated
|
||||
// for sync events, not rebuild).
|
||||
//
|
||||
// This is the path as used in Docker CLI commands, e.g.
|
||||
// - /workdir/main.go
|
||||
// - /workdir/subdir
|
||||
ContainerPath string
|
||||
}
|
||||
|
||||
type Syncer interface {
|
||||
Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
/*
|
||||
|
||||
Copyright 2020 Docker Compose CLI authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
@ -17,13 +17,13 @@ package compose
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/docker/compose/v2/internal/sync"
|
||||
|
||||
"github.com/compose-spec/compose-go/types"
|
||||
"github.com/jonboulle/clockwork"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
@ -54,11 +54,8 @@ type Trigger struct {
|
|||
|
||||
const quietPeriod = 2 * time.Second
|
||||
|
||||
// fileMapping contains the Compose service and modified host system path.
|
||||
//
|
||||
// For file sync, the container path is also included.
|
||||
// For rebuild, there is no container path, so it is always empty.
|
||||
type fileMapping struct {
|
||||
// fileEvent contains the Compose service and modified host system path.
|
||||
type fileEvent struct {
|
||||
// Service that the file event is for.
|
||||
Service string
|
||||
// HostPath that was created/modified/deleted outside the container.
|
||||
|
@ -67,17 +64,11 @@ type fileMapping struct {
|
|||
// - C:\Users\moby\Documents\hello-world\main.go
|
||||
// - /Users/moby/Documents/hello-world/main.go
|
||||
HostPath string
|
||||
// ContainerPath for the target file inside the container (only populated
|
||||
// for sync events, not rebuild).
|
||||
//
|
||||
// This is the path as used in Docker CLI commands, e.g.
|
||||
// - /workdir/main.go
|
||||
ContainerPath string
|
||||
}
|
||||
|
||||
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint: gocyclo
|
||||
needRebuild := make(chan fileMapping)
|
||||
needSync := make(chan fileMapping)
|
||||
needRebuild := make(chan fileEvent)
|
||||
needSync := make(chan sync.PathMapping)
|
||||
|
||||
_, err := s.prepareProjectForBuild(project, nil)
|
||||
if err != nil {
|
||||
|
@ -175,7 +166,7 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
|
|||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan fileMapping, needRebuild chan fileMapping) error {
|
||||
func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan sync.PathMapping, needRebuild chan fileEvent) error {
|
||||
ignores := make([]watch.PathMatcher, len(triggers))
|
||||
for i, trigger := range triggers {
|
||||
ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
|
||||
|
@ -209,11 +200,6 @@ WATCH:
|
|||
|
||||
fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath)
|
||||
|
||||
f := fileMapping{
|
||||
HostPath: hostPath,
|
||||
Service: name,
|
||||
}
|
||||
|
||||
switch trigger.Action {
|
||||
case WatchActionSync:
|
||||
logrus.Debugf("modified file %s triggered sync", hostPath)
|
||||
|
@ -221,12 +207,18 @@ WATCH:
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// always use Unix-style paths for inside the container
|
||||
f.ContainerPath = path.Join(trigger.Target, rel)
|
||||
needSync <- f
|
||||
needSync <- sync.PathMapping{
|
||||
Service: name,
|
||||
HostPath: hostPath,
|
||||
// always use Unix-style paths for inside the container
|
||||
ContainerPath: path.Join(trigger.Target, rel),
|
||||
}
|
||||
case WatchActionRebuild:
|
||||
logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
|
||||
needRebuild <- f
|
||||
needRebuild <- fileEvent{
|
||||
HostPath: hostPath,
|
||||
Service: name,
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("watch action %q is not supported", trigger)
|
||||
}
|
||||
|
@ -304,57 +296,25 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
|
|||
}
|
||||
}
|
||||
|
||||
func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error {
|
||||
func (s *composeService) makeSyncFn(
|
||||
ctx context.Context,
|
||||
project *types.Project,
|
||||
needSync <-chan sync.PathMapping,
|
||||
) func() error {
|
||||
syncer := sync.NewDockerCopy(project.Name, s, s.stdinfo())
|
||||
|
||||
return func() error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case opt := <-needSync:
|
||||
service, err := project.GetService(opt.Service)
|
||||
case pathMapping := <-needSync:
|
||||
service, err := project.GetService(pathMapping.Service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
scale := 1
|
||||
if service.Deploy != nil && service.Deploy.Replicas != nil {
|
||||
scale = int(*service.Deploy.Replicas)
|
||||
}
|
||||
|
||||
if fi, statErr := os.Stat(opt.HostPath); statErr == nil {
|
||||
if fi.IsDir() {
|
||||
for i := 1; i <= scale; i++ {
|
||||
_, err := s.Exec(ctx, project.Name, api.RunOptions{
|
||||
Service: opt.Service,
|
||||
Command: []string{"mkdir", "-p", opt.ContainerPath},
|
||||
Index: i,
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Warnf("failed to create %q from %s: %v", opt.ContainerPath, opt.Service, err)
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(s.stdinfo(), "%s created\n", opt.ContainerPath)
|
||||
} else {
|
||||
err := s.Copy(ctx, project.Name, api.CopyOptions{
|
||||
Source: opt.HostPath,
|
||||
Destination: fmt.Sprintf("%s:%s", opt.Service, opt.ContainerPath),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Fprintf(s.stdinfo(), "%s updated\n", opt.ContainerPath)
|
||||
}
|
||||
} else if errors.Is(statErr, fs.ErrNotExist) {
|
||||
for i := 1; i <= scale; i++ {
|
||||
_, err := s.Exec(ctx, project.Name, api.RunOptions{
|
||||
Service: opt.Service,
|
||||
Command: []string{"rm", "-rf", opt.ContainerPath},
|
||||
Index: i,
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Warnf("failed to delete %q from %s: %v", opt.ContainerPath, opt.Service, err)
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(s.stdinfo(), "%s deleted from service\n", opt.ContainerPath)
|
||||
if err := syncer.Sync(ctx, service, []sync.PathMapping{pathMapping}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -363,7 +323,7 @@ func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project,
|
|||
|
||||
type rebuildServices map[string]utils.Set[string]
|
||||
|
||||
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) {
|
||||
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent, fn func(services rebuildServices)) {
|
||||
services := make(rebuildServices)
|
||||
t := clock.NewTimer(delay)
|
||||
defer t.Stop()
|
||||
|
|
|
@ -19,6 +19,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/compose/v2/internal/sync"
|
||||
|
||||
"github.com/docker/cli/cli/command"
|
||||
"github.com/docker/compose/v2/pkg/watch"
|
||||
"github.com/jonboulle/clockwork"
|
||||
|
@ -27,7 +29,7 @@ import (
|
|||
)
|
||||
|
||||
func Test_debounce(t *testing.T) {
|
||||
ch := make(chan fileMapping)
|
||||
ch := make(chan fileEvent)
|
||||
var (
|
||||
ran int
|
||||
got []string
|
||||
|
@ -47,7 +49,7 @@ func Test_debounce(t *testing.T) {
|
|||
return nil
|
||||
})
|
||||
for i := 0; i < 100; i++ {
|
||||
ch <- fileMapping{Service: "test"}
|
||||
ch <- fileEvent{Service: "test"}
|
||||
}
|
||||
assert.Equal(t, ran, 0)
|
||||
clock.Advance(quietPeriod)
|
||||
|
@ -79,8 +81,8 @@ func (t testWatcher) Errors() chan error {
|
|||
}
|
||||
|
||||
func Test_sync(t *testing.T) {
|
||||
needSync := make(chan fileMapping)
|
||||
needRebuild := make(chan fileMapping)
|
||||
needSync := make(chan sync.PathMapping)
|
||||
needRebuild := make(chan fileEvent)
|
||||
ctx, cancelFunc := context.WithCancel(context.TODO())
|
||||
defer cancelFunc()
|
||||
|
||||
|
@ -119,7 +121,7 @@ func Test_sync(t *testing.T) {
|
|||
watcher.Events() <- watch.NewFileEvent("/src/changed")
|
||||
select {
|
||||
case actual := <-needSync:
|
||||
assert.DeepEqual(t, fileMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual)
|
||||
assert.DeepEqual(t, sync.PathMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("timeout")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue