diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index 6b0588c1a..a72e00499 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -17,6 +17,9 @@ package compose import ( "context" "fmt" + "io/fs" + "os" + "path" "path/filepath" "strings" "time" @@ -50,9 +53,30 @@ type Trigger struct { const quietPeriod = 2 * time.Second -func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint:gocyclo - needRebuild := make(chan string) - needSync := make(chan api.CopyOptions, 5) +// fileMapping contains the Compose service and modified host system path. +// +// For file sync, the container path is also included. +// For rebuild, there is no container path, so it is always empty. +type fileMapping struct { + // service that the file event is for. + service string + // hostPath that was created/modified/deleted outside the container. + // + // This is the path as seen from the user's perspective, e.g. + // - C:\Users\moby\Documents\hello-world\main.go + // - /Users/moby/Documents/hello-world/main.go + hostPath string + // containerPath for the target file inside the container (only populated + // for sync events, not rebuild). + // + // This is the path as used in Docker CLI commands, e.g. + // - /workdir/main.go + containerPath string +} + +func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint:gocyclo + needRebuild := make(chan fileMapping) + needSync := make(chan fileMapping) eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { @@ -120,38 +144,37 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv case <-ctx.Done(): return nil case event := <-watcher.Events(): - path := event.Path() + hostPath := event.Path() for _, trigger := range config.Watch { - logrus.Debugf("change detected on %s - comparing with %s", path, trigger.Path) - if watch.IsChild(trigger.Path, path) { - fmt.Fprintf(s.stderr(), "change detected on %s\n", path) + logrus.Debugf("change detected on %s - comparing with %s", hostPath, trigger.Path) + if watch.IsChild(trigger.Path, hostPath) { + fmt.Fprintf(s.stderr(), "change detected on %s\n", hostPath) + + f := fileMapping{ + hostPath: hostPath, + service: name, + } switch trigger.Action { case WatchActionSync: - logrus.Debugf("modified file %s triggered sync", path) - rel, err := filepath.Rel(trigger.Path, path) + logrus.Debugf("modified file %s triggered sync", hostPath) + rel, err := filepath.Rel(trigger.Path, hostPath) if err != nil { return err } - dest := filepath.Join(trigger.Target, rel) - needSync <- api.CopyOptions{ - Source: path, - Destination: fmt.Sprintf("%s:%s", name, dest), - } + // always use Unix-style paths for inside the container + f.containerPath = path.Join(trigger.Target, rel) + needSync <- f case WatchActionRebuild: - logrus.Debugf("modified file %s requires image to be rebuilt", path) - needRebuild <- name + logrus.Debugf("modified file %s requires image to be rebuilt", hostPath) + needRebuild <- f default: return fmt.Errorf("watch action %q is not supported", trigger) } continue WATCH } } - - // default - needRebuild <- name - case err := <-watcher.Errors(): return err } @@ -183,11 +206,25 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) return config, nil } -func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services []string) { - return func(services []string) { - fmt.Fprintf(s.stderr(), "Updating %s after changes were detected\n", strings.Join(services, ", ")) +func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) { + return func(services rebuildServices) { + serviceNames := make([]string, 0, len(services)) + allPaths := make(utils.Set[string]) + for serviceName, paths := range services { + serviceNames = append(serviceNames, serviceName) + for p := range paths { + allPaths.Add(p) + } + } + + fmt.Fprintf( + s.stderr(), + "Rebuilding %s after changes were detected:%s\n", + strings.Join(serviceNames, ", "), + strings.Join(append([]string{""}, allPaths.Elements()...), "\n - "), + ) imageIds, err := s.build(ctx, project, api.BuildOptions{ - Services: services, + Services: serviceNames, }) if err != nil { fmt.Fprintf(s.stderr(), "Build failed\n") @@ -201,11 +238,11 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje err = s.Up(ctx, project, api.UpOptions{ Create: api.CreateOptions{ - Services: services, + Services: serviceNames, Inherit: true, }, Start: api.StartOptions{ - Services: services, + Services: serviceNames, Project: project, }, }) @@ -215,39 +252,61 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje } } -func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync chan api.CopyOptions) func() error { +func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error { return func() error { for { select { case <-ctx.Done(): return nil case opt := <-needSync: - err := s.Copy(ctx, project.Name, opt) - if err != nil { - return err + if fi, statErr := os.Stat(opt.hostPath); statErr == nil && !fi.IsDir() { + err := s.Copy(ctx, project.Name, api.CopyOptions{ + Source: opt.hostPath, + Destination: fmt.Sprintf("%s:%s", opt.service, opt.containerPath), + }) + if err != nil { + return err + } + fmt.Fprintf(s.stderr(), "%s updated\n", opt.containerPath) + } else if errors.Is(statErr, fs.ErrNotExist) { + _, err := s.Exec(ctx, project.Name, api.RunOptions{ + Service: opt.service, + Command: []string{"rm", "-rf", opt.containerPath}, + Index: 1, + }) + if err != nil { + logrus.Warnf("failed to delete %q from %s: %v", opt.containerPath, opt.service, err) + } + fmt.Fprintf(s.stderr(), "%s deleted from container\n", opt.containerPath) } - fmt.Fprintf(s.stderr(), "%s updated\n", opt.Destination) } } } } -func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input chan string, fn func(services []string)) { - services := utils.Set[string]{} +type rebuildServices map[string]utils.Set[string] + +func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) { + services := make(rebuildServices) t := clock.AfterFunc(delay, func() { if len(services) > 0 { - refresh := services.Elements() - services.Clear() - fn(refresh) + fn(services) + // TODO(milas): this is a data race! + services = make(rebuildServices) } }) for { select { case <-ctx.Done(): return - case service := <-input: + case e := <-input: t.Reset(delay) - services.Add(service) + svc, ok := services[e.service] + if !ok { + svc = make(utils.Set[string]) + services[e.service] = svc + } + svc.Add(e.hostPath) } } } diff --git a/pkg/compose/watch_test.go b/pkg/compose/watch_test.go index cbcb5acb7..08e50549e 100644 --- a/pkg/compose/watch_test.go +++ b/pkg/compose/watch_test.go @@ -24,24 +24,27 @@ import ( ) func Test_debounce(t *testing.T) { - ch := make(chan string) + ch := make(chan fileMapping) var ( ran int got []string ) clock := clockwork.NewFakeClock() - ctx, stop := context.WithCancel(context.TODO()) + ctx, stop := context.WithCancel(context.Background()) + t.Cleanup(stop) eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - debounce(ctx, clock, quietPeriod, ch, func(services []string) { - got = append(got, services...) + debounce(ctx, clock, quietPeriod, ch, func(services rebuildServices) { + for svc := range services { + got = append(got, svc) + } ran++ stop() }) return nil }) for i := 0; i < 100; i++ { - ch <- "test" + ch <- fileMapping{service: "test"} } assert.Equal(t, ran, 0) clock.Advance(quietPeriod)