Merge pull request #10386 from milas/fw-renames

watch: add file delete/rename handling
This commit is contained in:
Guillaume Lours 2023-03-21 13:48:56 +01:00 committed by GitHub
commit bef9c48a1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 105 additions and 43 deletions

View File

@ -17,6 +17,9 @@ package compose
import ( import (
"context" "context"
"fmt" "fmt"
"io/fs"
"os"
"path"
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
@ -50,9 +53,30 @@ type Trigger struct {
const quietPeriod = 2 * time.Second const quietPeriod = 2 * time.Second
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint:gocyclo // fileMapping contains the Compose service and modified host system path.
needRebuild := make(chan string) //
needSync := make(chan api.CopyOptions, 5) // For file sync, the container path is also included.
// For rebuild, there is no container path, so it is always empty.
type fileMapping struct {
// service that the file event is for.
service string
// hostPath that was created/modified/deleted outside the container.
//
// This is the path as seen from the user's perspective, e.g.
// - C:\Users\moby\Documents\hello-world\main.go
// - /Users/moby/Documents/hello-world/main.go
hostPath string
// containerPath for the target file inside the container (only populated
// for sync events, not rebuild).
//
// This is the path as used in Docker CLI commands, e.g.
// - /workdir/main.go
containerPath string
}
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint:gocyclo
needRebuild := make(chan fileMapping)
needSync := make(chan fileMapping)
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { eg.Go(func() error {
@ -120,38 +144,37 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case event := <-watcher.Events(): case event := <-watcher.Events():
path := event.Path() hostPath := event.Path()
for _, trigger := range config.Watch { for _, trigger := range config.Watch {
logrus.Debugf("change detected on %s - comparing with %s", path, trigger.Path) logrus.Debugf("change detected on %s - comparing with %s", hostPath, trigger.Path)
if watch.IsChild(trigger.Path, path) { if watch.IsChild(trigger.Path, hostPath) {
fmt.Fprintf(s.stderr(), "change detected on %s\n", path) fmt.Fprintf(s.stderr(), "change detected on %s\n", hostPath)
f := fileMapping{
hostPath: hostPath,
service: name,
}
switch trigger.Action { switch trigger.Action {
case WatchActionSync: case WatchActionSync:
logrus.Debugf("modified file %s triggered sync", path) logrus.Debugf("modified file %s triggered sync", hostPath)
rel, err := filepath.Rel(trigger.Path, path) rel, err := filepath.Rel(trigger.Path, hostPath)
if err != nil { if err != nil {
return err return err
} }
dest := filepath.Join(trigger.Target, rel) // always use Unix-style paths for inside the container
needSync <- api.CopyOptions{ f.containerPath = path.Join(trigger.Target, rel)
Source: path, needSync <- f
Destination: fmt.Sprintf("%s:%s", name, dest),
}
case WatchActionRebuild: case WatchActionRebuild:
logrus.Debugf("modified file %s requires image to be rebuilt", path) logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
needRebuild <- name needRebuild <- f
default: default:
return fmt.Errorf("watch action %q is not supported", trigger) return fmt.Errorf("watch action %q is not supported", trigger)
} }
continue WATCH continue WATCH
} }
} }
// default
needRebuild <- name
case err := <-watcher.Errors(): case err := <-watcher.Errors():
return err return err
} }
@ -183,11 +206,25 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
return config, nil return config, nil
} }
func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services []string) { func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) {
return func(services []string) { return func(services rebuildServices) {
fmt.Fprintf(s.stderr(), "Updating %s after changes were detected\n", strings.Join(services, ", ")) serviceNames := make([]string, 0, len(services))
allPaths := make(utils.Set[string])
for serviceName, paths := range services {
serviceNames = append(serviceNames, serviceName)
for p := range paths {
allPaths.Add(p)
}
}
fmt.Fprintf(
s.stderr(),
"Rebuilding %s after changes were detected:%s\n",
strings.Join(serviceNames, ", "),
strings.Join(append([]string{""}, allPaths.Elements()...), "\n - "),
)
imageIds, err := s.build(ctx, project, api.BuildOptions{ imageIds, err := s.build(ctx, project, api.BuildOptions{
Services: services, Services: serviceNames,
}) })
if err != nil { if err != nil {
fmt.Fprintf(s.stderr(), "Build failed\n") fmt.Fprintf(s.stderr(), "Build failed\n")
@ -201,11 +238,11 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
err = s.Up(ctx, project, api.UpOptions{ err = s.Up(ctx, project, api.UpOptions{
Create: api.CreateOptions{ Create: api.CreateOptions{
Services: services, Services: serviceNames,
Inherit: true, Inherit: true,
}, },
Start: api.StartOptions{ Start: api.StartOptions{
Services: services, Services: serviceNames,
Project: project, Project: project,
}, },
}) })
@ -215,39 +252,61 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
} }
} }
func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync chan api.CopyOptions) func() error { func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error {
return func() error { return func() error {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case opt := <-needSync: case opt := <-needSync:
err := s.Copy(ctx, project.Name, opt) if fi, statErr := os.Stat(opt.hostPath); statErr == nil && !fi.IsDir() {
err := s.Copy(ctx, project.Name, api.CopyOptions{
Source: opt.hostPath,
Destination: fmt.Sprintf("%s:%s", opt.service, opt.containerPath),
})
if err != nil { if err != nil {
return err return err
} }
fmt.Fprintf(s.stderr(), "%s updated\n", opt.Destination) fmt.Fprintf(s.stderr(), "%s updated\n", opt.containerPath)
} else if errors.Is(statErr, fs.ErrNotExist) {
_, err := s.Exec(ctx, project.Name, api.RunOptions{
Service: opt.service,
Command: []string{"rm", "-rf", opt.containerPath},
Index: 1,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", opt.containerPath, opt.service, err)
}
fmt.Fprintf(s.stderr(), "%s deleted from container\n", opt.containerPath)
}
} }
} }
} }
} }
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input chan string, fn func(services []string)) { type rebuildServices map[string]utils.Set[string]
services := utils.Set[string]{}
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) {
services := make(rebuildServices)
t := clock.AfterFunc(delay, func() { t := clock.AfterFunc(delay, func() {
if len(services) > 0 { if len(services) > 0 {
refresh := services.Elements() fn(services)
services.Clear() // TODO(milas): this is a data race!
fn(refresh) services = make(rebuildServices)
} }
}) })
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case service := <-input: case e := <-input:
t.Reset(delay) t.Reset(delay)
services.Add(service) svc, ok := services[e.service]
if !ok {
svc = make(utils.Set[string])
services[e.service] = svc
}
svc.Add(e.hostPath)
} }
} }
} }

View File

@ -24,24 +24,27 @@ import (
) )
func Test_debounce(t *testing.T) { func Test_debounce(t *testing.T) {
ch := make(chan string) ch := make(chan fileMapping)
var ( var (
ran int ran int
got []string got []string
) )
clock := clockwork.NewFakeClock() clock := clockwork.NewFakeClock()
ctx, stop := context.WithCancel(context.TODO()) ctx, stop := context.WithCancel(context.Background())
t.Cleanup(stop)
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { eg.Go(func() error {
debounce(ctx, clock, quietPeriod, ch, func(services []string) { debounce(ctx, clock, quietPeriod, ch, func(services rebuildServices) {
got = append(got, services...) for svc := range services {
got = append(got, svc)
}
ran++ ran++
stop() stop()
}) })
return nil return nil
}) })
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
ch <- "test" ch <- fileMapping{service: "test"}
} }
assert.Equal(t, ran, 0) assert.Equal(t, ran, 0)
clock.Advance(quietPeriod) clock.Advance(quietPeriod)