From 105a7c5b70812049e43fff96d4e3009e0ca53e4c Mon Sep 17 00:00:00 2001 From: Milas Bowman Date: Mon, 20 Mar 2023 13:25:19 -0400 Subject: [PATCH] watch: add file delete/rename handling This approach mimics Tilt's behavior[^1]: 1. At sync time, `stat` the path on host 2. If the path does not exist -> `rm` from container 3. If the path exists -> sync to container By handling things this way, we're always syncing based on the true state, regardless of what's happened in the interim. For example, a common pattern in POSIX tools is to create a file and then rename it over an existing file. Based on timing, this could be a sync, delete, sync (every file gets seen & processed) OR a delete, sync (by the the time we process the event, the "temp" file is already gone, so we just delete it from the container, where it never existed, but that's fine since we deletes are idempotent thanks to the `-f` flag on `rm`). Additionally, when syncing, if the `stat` call shows it's for a directory, we ignore it. Otherwise, duplicate, nested copies of the entire path could get synced in. (On some OSes, an event for the directory gets dispatched when a file inside of it is modified. In practice, I think we might want this pushed further down in the watching code, but since we're already `stat`ing the paths here now, it's a good place to handle it.) Lastly, there's some very light changes to the text when it does a full rebuild that will list out the (merged) set of paths that triggered it. We can continue to improve the output, but this is really helpful for understanding why it's rebuilding. [^1]: https://github.com/tilt-dev/tilt/blob/db7f887b0658ed042069dc0ff4cb266fe0596c23/internal/controllers/core/liveupdate/reconciler.go#L911 Signed-off-by: Milas Bowman --- pkg/compose/watch.go | 135 +++++++++++++++++++++++++++----------- pkg/compose/watch_test.go | 13 ++-- 2 files changed, 105 insertions(+), 43 deletions(-) diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index 6b0588c1a..a72e00499 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -17,6 +17,9 @@ package compose import ( "context" "fmt" + "io/fs" + "os" + "path" "path/filepath" "strings" "time" @@ -50,9 +53,30 @@ type Trigger struct { const quietPeriod = 2 * time.Second -func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint:gocyclo - needRebuild := make(chan string) - needSync := make(chan api.CopyOptions, 5) +// fileMapping contains the Compose service and modified host system path. +// +// For file sync, the container path is also included. +// For rebuild, there is no container path, so it is always empty. +type fileMapping struct { + // service that the file event is for. + service string + // hostPath that was created/modified/deleted outside the container. + // + // This is the path as seen from the user's perspective, e.g. + // - C:\Users\moby\Documents\hello-world\main.go + // - /Users/moby/Documents/hello-world/main.go + hostPath string + // containerPath for the target file inside the container (only populated + // for sync events, not rebuild). + // + // This is the path as used in Docker CLI commands, e.g. + // - /workdir/main.go + containerPath string +} + +func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint:gocyclo + needRebuild := make(chan fileMapping) + needSync := make(chan fileMapping) eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { @@ -120,38 +144,37 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv case <-ctx.Done(): return nil case event := <-watcher.Events(): - path := event.Path() + hostPath := event.Path() for _, trigger := range config.Watch { - logrus.Debugf("change detected on %s - comparing with %s", path, trigger.Path) - if watch.IsChild(trigger.Path, path) { - fmt.Fprintf(s.stderr(), "change detected on %s\n", path) + logrus.Debugf("change detected on %s - comparing with %s", hostPath, trigger.Path) + if watch.IsChild(trigger.Path, hostPath) { + fmt.Fprintf(s.stderr(), "change detected on %s\n", hostPath) + + f := fileMapping{ + hostPath: hostPath, + service: name, + } switch trigger.Action { case WatchActionSync: - logrus.Debugf("modified file %s triggered sync", path) - rel, err := filepath.Rel(trigger.Path, path) + logrus.Debugf("modified file %s triggered sync", hostPath) + rel, err := filepath.Rel(trigger.Path, hostPath) if err != nil { return err } - dest := filepath.Join(trigger.Target, rel) - needSync <- api.CopyOptions{ - Source: path, - Destination: fmt.Sprintf("%s:%s", name, dest), - } + // always use Unix-style paths for inside the container + f.containerPath = path.Join(trigger.Target, rel) + needSync <- f case WatchActionRebuild: - logrus.Debugf("modified file %s requires image to be rebuilt", path) - needRebuild <- name + logrus.Debugf("modified file %s requires image to be rebuilt", hostPath) + needRebuild <- f default: return fmt.Errorf("watch action %q is not supported", trigger) } continue WATCH } } - - // default - needRebuild <- name - case err := <-watcher.Errors(): return err } @@ -183,11 +206,25 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) return config, nil } -func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services []string) { - return func(services []string) { - fmt.Fprintf(s.stderr(), "Updating %s after changes were detected\n", strings.Join(services, ", ")) +func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) { + return func(services rebuildServices) { + serviceNames := make([]string, 0, len(services)) + allPaths := make(utils.Set[string]) + for serviceName, paths := range services { + serviceNames = append(serviceNames, serviceName) + for p := range paths { + allPaths.Add(p) + } + } + + fmt.Fprintf( + s.stderr(), + "Rebuilding %s after changes were detected:%s\n", + strings.Join(serviceNames, ", "), + strings.Join(append([]string{""}, allPaths.Elements()...), "\n - "), + ) imageIds, err := s.build(ctx, project, api.BuildOptions{ - Services: services, + Services: serviceNames, }) if err != nil { fmt.Fprintf(s.stderr(), "Build failed\n") @@ -201,11 +238,11 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje err = s.Up(ctx, project, api.UpOptions{ Create: api.CreateOptions{ - Services: services, + Services: serviceNames, Inherit: true, }, Start: api.StartOptions{ - Services: services, + Services: serviceNames, Project: project, }, }) @@ -215,39 +252,61 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje } } -func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync chan api.CopyOptions) func() error { +func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error { return func() error { for { select { case <-ctx.Done(): return nil case opt := <-needSync: - err := s.Copy(ctx, project.Name, opt) - if err != nil { - return err + if fi, statErr := os.Stat(opt.hostPath); statErr == nil && !fi.IsDir() { + err := s.Copy(ctx, project.Name, api.CopyOptions{ + Source: opt.hostPath, + Destination: fmt.Sprintf("%s:%s", opt.service, opt.containerPath), + }) + if err != nil { + return err + } + fmt.Fprintf(s.stderr(), "%s updated\n", opt.containerPath) + } else if errors.Is(statErr, fs.ErrNotExist) { + _, err := s.Exec(ctx, project.Name, api.RunOptions{ + Service: opt.service, + Command: []string{"rm", "-rf", opt.containerPath}, + Index: 1, + }) + if err != nil { + logrus.Warnf("failed to delete %q from %s: %v", opt.containerPath, opt.service, err) + } + fmt.Fprintf(s.stderr(), "%s deleted from container\n", opt.containerPath) } - fmt.Fprintf(s.stderr(), "%s updated\n", opt.Destination) } } } } -func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input chan string, fn func(services []string)) { - services := utils.Set[string]{} +type rebuildServices map[string]utils.Set[string] + +func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) { + services := make(rebuildServices) t := clock.AfterFunc(delay, func() { if len(services) > 0 { - refresh := services.Elements() - services.Clear() - fn(refresh) + fn(services) + // TODO(milas): this is a data race! + services = make(rebuildServices) } }) for { select { case <-ctx.Done(): return - case service := <-input: + case e := <-input: t.Reset(delay) - services.Add(service) + svc, ok := services[e.service] + if !ok { + svc = make(utils.Set[string]) + services[e.service] = svc + } + svc.Add(e.hostPath) } } } diff --git a/pkg/compose/watch_test.go b/pkg/compose/watch_test.go index cbcb5acb7..08e50549e 100644 --- a/pkg/compose/watch_test.go +++ b/pkg/compose/watch_test.go @@ -24,24 +24,27 @@ import ( ) func Test_debounce(t *testing.T) { - ch := make(chan string) + ch := make(chan fileMapping) var ( ran int got []string ) clock := clockwork.NewFakeClock() - ctx, stop := context.WithCancel(context.TODO()) + ctx, stop := context.WithCancel(context.Background()) + t.Cleanup(stop) eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - debounce(ctx, clock, quietPeriod, ch, func(services []string) { - got = append(got, services...) + debounce(ctx, clock, quietPeriod, ch, func(services rebuildServices) { + for svc := range services { + got = append(got, svc) + } ran++ stop() }) return nil }) for i := 0; i < 100; i++ { - ch <- "test" + ch <- fileMapping{service: "test"} } assert.Equal(t, ran, 0) clock.Advance(quietPeriod)