watch: move sync logic into separate package

Just moving some code around in preparation for an alternative
sync implementation that can do bulk transfers by using `tar`.

Signed-off-by: Milas Bowman <milas.bowman@docker.com>
This commit is contained in:
Milas Bowman 2023-07-10 12:16:20 -04:00 committed by Guillaume Lours
parent 9174a99d27
commit cb17c3c8a6
4 changed files with 188 additions and 75 deletions

107
internal/sync/docker_cp.go Normal file
View File

@ -0,0 +1,107 @@
/*
Copyright 2023 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"github.com/compose-spec/compose-go/types"
"github.com/docker/compose/v2/pkg/api"
"github.com/sirupsen/logrus"
)
type ComposeClient interface {
Exec(ctx context.Context, projectName string, options api.RunOptions) (int, error)
Copy(ctx context.Context, projectName string, options api.CopyOptions) error
}
type DockerCopy struct {
client ComposeClient
projectName string
infoWriter io.Writer
}
var _ Syncer = &DockerCopy{}
func NewDockerCopy(projectName string, client ComposeClient, infoWriter io.Writer) *DockerCopy {
return &DockerCopy{
projectName: projectName,
client: client,
infoWriter: infoWriter,
}
}
func (d *DockerCopy) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
var errs []error
for i := range paths {
if err := d.sync(ctx, service, paths[i]); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, pathMapping PathMapping) error {
scale := 1
if service.Deploy != nil && service.Deploy.Replicas != nil {
scale = int(*service.Deploy.Replicas)
}
if fi, statErr := os.Stat(pathMapping.HostPath); statErr == nil {
if fi.IsDir() {
for i := 1; i <= scale; i++ {
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
Service: pathMapping.Service,
Command: []string{"mkdir", "-p", pathMapping.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
}
}
fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath)
} else {
err := d.client.Copy(ctx, d.projectName, api.CopyOptions{
Source: pathMapping.HostPath,
Destination: fmt.Sprintf("%s:%s", pathMapping.Service, pathMapping.ContainerPath),
})
if err != nil {
return err
}
fmt.Fprintf(d.infoWriter, "%s updated\n", pathMapping.ContainerPath)
}
} else if errors.Is(statErr, fs.ErrNotExist) {
for i := 1; i <= scale; i++ {
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
Service: pathMapping.Service,
Command: []string{"rm", "-rf", pathMapping.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
}
}
fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath)
}
return nil
}

44
internal/sync/shared.go Normal file
View File

@ -0,0 +1,44 @@
/*
Copyright 2023 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"context"
"github.com/compose-spec/compose-go/types"
)
// PathMapping contains the Compose service and modified host system path.
type PathMapping struct {
// Service that the file event is for.
Service string
// HostPath that was created/modified/deleted outside the container.
//
// This is the path as seen from the user's perspective, e.g.
// - C:\Users\moby\Documents\hello-world\main.go (file on Windows)
// - /Users/moby/Documents/hello-world (directory on macOS)
HostPath string
// ContainerPath for the target file inside the container (only populated
// for sync events, not rebuild).
//
// This is the path as used in Docker CLI commands, e.g.
// - /workdir/main.go
// - /workdir/subdir
ContainerPath string
}
type Syncer interface {
Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error
}

View File

@ -1,6 +1,6 @@
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@ -17,13 +17,13 @@ package compose
import (
"context"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/docker/compose/v2/internal/sync"
"github.com/compose-spec/compose-go/types"
"github.com/jonboulle/clockwork"
"github.com/mitchellh/mapstructure"
@ -54,11 +54,8 @@ type Trigger struct {
const quietPeriod = 2 * time.Second
// fileMapping contains the Compose service and modified host system path.
//
// For file sync, the container path is also included.
// For rebuild, there is no container path, so it is always empty.
type fileMapping struct {
// fileEvent contains the Compose service and modified host system path.
type fileEvent struct {
// Service that the file event is for.
Service string
// HostPath that was created/modified/deleted outside the container.
@ -67,17 +64,11 @@ type fileMapping struct {
// - C:\Users\moby\Documents\hello-world\main.go
// - /Users/moby/Documents/hello-world/main.go
HostPath string
// ContainerPath for the target file inside the container (only populated
// for sync events, not rebuild).
//
// This is the path as used in Docker CLI commands, e.g.
// - /workdir/main.go
ContainerPath string
}
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint: gocyclo
needRebuild := make(chan fileMapping)
needSync := make(chan fileMapping)
needRebuild := make(chan fileEvent)
needSync := make(chan sync.PathMapping)
_, err := s.prepareProjectForBuild(project, nil)
if err != nil {
@ -175,7 +166,7 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
return eg.Wait()
}
func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan fileMapping, needRebuild chan fileMapping) error {
func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan sync.PathMapping, needRebuild chan fileEvent) error {
ignores := make([]watch.PathMatcher, len(triggers))
for i, trigger := range triggers {
ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
@ -209,11 +200,6 @@ WATCH:
fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath)
f := fileMapping{
HostPath: hostPath,
Service: name,
}
switch trigger.Action {
case WatchActionSync:
logrus.Debugf("modified file %s triggered sync", hostPath)
@ -221,12 +207,18 @@ WATCH:
if err != nil {
return err
}
// always use Unix-style paths for inside the container
f.ContainerPath = path.Join(trigger.Target, rel)
needSync <- f
needSync <- sync.PathMapping{
Service: name,
HostPath: hostPath,
// always use Unix-style paths for inside the container
ContainerPath: path.Join(trigger.Target, rel),
}
case WatchActionRebuild:
logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
needRebuild <- f
needRebuild <- fileEvent{
HostPath: hostPath,
Service: name,
}
default:
return fmt.Errorf("watch action %q is not supported", trigger)
}
@ -304,57 +296,25 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
}
}
func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error {
func (s *composeService) makeSyncFn(
ctx context.Context,
project *types.Project,
needSync <-chan sync.PathMapping,
) func() error {
syncer := sync.NewDockerCopy(project.Name, s, s.stdinfo())
return func() error {
for {
select {
case <-ctx.Done():
return nil
case opt := <-needSync:
service, err := project.GetService(opt.Service)
case pathMapping := <-needSync:
service, err := project.GetService(pathMapping.Service)
if err != nil {
return err
}
scale := 1
if service.Deploy != nil && service.Deploy.Replicas != nil {
scale = int(*service.Deploy.Replicas)
}
if fi, statErr := os.Stat(opt.HostPath); statErr == nil {
if fi.IsDir() {
for i := 1; i <= scale; i++ {
_, err := s.Exec(ctx, project.Name, api.RunOptions{
Service: opt.Service,
Command: []string{"mkdir", "-p", opt.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to create %q from %s: %v", opt.ContainerPath, opt.Service, err)
}
}
fmt.Fprintf(s.stdinfo(), "%s created\n", opt.ContainerPath)
} else {
err := s.Copy(ctx, project.Name, api.CopyOptions{
Source: opt.HostPath,
Destination: fmt.Sprintf("%s:%s", opt.Service, opt.ContainerPath),
})
if err != nil {
return err
}
fmt.Fprintf(s.stdinfo(), "%s updated\n", opt.ContainerPath)
}
} else if errors.Is(statErr, fs.ErrNotExist) {
for i := 1; i <= scale; i++ {
_, err := s.Exec(ctx, project.Name, api.RunOptions{
Service: opt.Service,
Command: []string{"rm", "-rf", opt.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", opt.ContainerPath, opt.Service, err)
}
}
fmt.Fprintf(s.stdinfo(), "%s deleted from service\n", opt.ContainerPath)
if err := syncer.Sync(ctx, service, []sync.PathMapping{pathMapping}); err != nil {
return err
}
}
}
@ -363,7 +323,7 @@ func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project,
type rebuildServices map[string]utils.Set[string]
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) {
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent, fn func(services rebuildServices)) {
services := make(rebuildServices)
t := clock.NewTimer(delay)
defer t.Stop()

View File

@ -19,6 +19,8 @@ import (
"testing"
"time"
"github.com/docker/compose/v2/internal/sync"
"github.com/docker/cli/cli/command"
"github.com/docker/compose/v2/pkg/watch"
"github.com/jonboulle/clockwork"
@ -27,7 +29,7 @@ import (
)
func Test_debounce(t *testing.T) {
ch := make(chan fileMapping)
ch := make(chan fileEvent)
var (
ran int
got []string
@ -47,7 +49,7 @@ func Test_debounce(t *testing.T) {
return nil
})
for i := 0; i < 100; i++ {
ch <- fileMapping{Service: "test"}
ch <- fileEvent{Service: "test"}
}
assert.Equal(t, ran, 0)
clock.Advance(quietPeriod)
@ -79,8 +81,8 @@ func (t testWatcher) Errors() chan error {
}
func Test_sync(t *testing.T) {
needSync := make(chan fileMapping)
needRebuild := make(chan fileMapping)
needSync := make(chan sync.PathMapping)
needRebuild := make(chan fileEvent)
ctx, cancelFunc := context.WithCancel(context.TODO())
defer cancelFunc()
@ -119,7 +121,7 @@ func Test_sync(t *testing.T) {
watcher.Events() <- watch.NewFileEvent("/src/changed")
select {
case actual := <-needSync:
assert.DeepEqual(t, fileMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual)
assert.DeepEqual(t, sync.PathMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual)
case <-time.After(100 * time.Millisecond):
t.Error("timeout")
}