From 3b0742fd576e0cd5d320b35cd9f1a69859d15bbc Mon Sep 17 00:00:00 2001 From: Milas Bowman Date: Thu, 3 Aug 2023 14:53:02 -0400 Subject: [PATCH] watch: batch & de-duplicate file events (#10865) Adjust the debouncing logic so that it applies to all inbound file events, regardless of whether they match a sync or rebuild rule. When the batch is flushed out, if any event for the service is a rebuild event, then the service is rebuilt and all sync events for the batch are ignored. If _all_ events in the batch are sync events, then a sync is triggered, passing the entire batch at once. This provides a substantial performance win for the new `tar`-based implementation, as it can efficiently transfer the changes in bulk. Additionally, this helps with jitter, e.g. it's not uncommon for there to be double-writes in quick succession to a file, so even if there's not many files being modified at once, it can still prevent some unnecessary transfers. Signed-off-by: Milas Bowman --- internal/sync/docker_cp.go | 10 +- internal/sync/shared.go | 2 - internal/sync/tar.go | 5 +- pkg/compose/compose.go | 4 + pkg/compose/watch.go | 378 +++++++++++++++++++++---------------- pkg/compose/watch_test.go | 218 ++++++++++++--------- pkg/e2e/watch_test.go | 3 +- 7 files changed, 362 insertions(+), 258 deletions(-) diff --git a/internal/sync/docker_cp.go b/internal/sync/docker_cp.go index 8e91c94fa..ae5a77ad6 100644 --- a/internal/sync/docker_cp.go +++ b/internal/sync/docker_cp.go @@ -71,19 +71,19 @@ func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, path if fi.IsDir() { for i := 1; i <= scale; i++ { _, err := d.client.Exec(ctx, d.projectName, api.RunOptions{ - Service: pathMapping.Service, + Service: service.Name, Command: []string{"mkdir", "-p", pathMapping.ContainerPath}, Index: i, }) if err != nil { - logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err) + logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, service.Name, err) } } fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath) } else { err := d.client.Copy(ctx, d.projectName, api.CopyOptions{ Source: pathMapping.HostPath, - Destination: fmt.Sprintf("%s:%s", pathMapping.Service, pathMapping.ContainerPath), + Destination: fmt.Sprintf("%s:%s", service.Name, pathMapping.ContainerPath), }) if err != nil { return err @@ -93,12 +93,12 @@ func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, path } else if errors.Is(statErr, fs.ErrNotExist) { for i := 1; i <= scale; i++ { _, err := d.client.Exec(ctx, d.projectName, api.RunOptions{ - Service: pathMapping.Service, + Service: service.Name, Command: []string{"rm", "-rf", pathMapping.ContainerPath}, Index: i, }) if err != nil { - logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err) + logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, service.Name, err) } } fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath) diff --git a/internal/sync/shared.go b/internal/sync/shared.go index 0fb15d49e..2ff9b434c 100644 --- a/internal/sync/shared.go +++ b/internal/sync/shared.go @@ -22,8 +22,6 @@ import ( // PathMapping contains the Compose service and modified host system path. type PathMapping struct { - // Service that the file event is for. - Service string // HostPath that was created/modified/deleted outside the container. // // This is the path as seen from the user's perspective, e.g. diff --git a/internal/sync/tar.go b/internal/sync/tar.go index ff3990ebb..c16444c3b 100644 --- a/internal/sync/tar.go +++ b/internal/sync/tar.go @@ -121,9 +121,7 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat } type ArchiveBuilder struct { - tw *tar.Writer - paths []string // local paths archived - + tw *tar.Writer // A shared I/O buffer to help with file copying. copyBuf *bytes.Buffer } @@ -168,7 +166,6 @@ func (a *ArchiveBuilder) ArchivePathsIfExist(paths []PathMapping) error { if err != nil { return fmt.Errorf("archiving %q: %w", entry.path, err) } - a.paths = append(a.paths, entry.path) } return nil } diff --git a/pkg/compose/compose.go b/pkg/compose/compose.go index aa0ca9f22..33f93397a 100644 --- a/pkg/compose/compose.go +++ b/pkg/compose/compose.go @@ -26,6 +26,8 @@ import ( "strings" "sync" + "github.com/jonboulle/clockwork" + "github.com/docker/docker/api/types/volume" "github.com/compose-spec/compose-go/types" @@ -58,6 +60,7 @@ func init() { func NewComposeService(dockerCli command.Cli) api.Service { return &composeService{ dockerCli: dockerCli, + clock: clockwork.NewRealClock(), maxConcurrency: -1, dryRun: false, } @@ -65,6 +68,7 @@ func NewComposeService(dockerCli command.Cli) api.Service { type composeService struct { dockerCli command.Cli + clock clockwork.Clock maxConcurrency int dryRun bool } diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index f49bcbfe4..cc61878fa 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -21,6 +21,7 @@ import ( "os" "path" "path/filepath" + "sort" "strconv" "strings" "time" @@ -37,7 +38,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/docker/compose/v2/pkg/api" - "github.com/docker/compose/v2/pkg/utils" "github.com/docker/compose/v2/pkg/watch" ) @@ -45,9 +45,11 @@ type DevelopmentConfig struct { Watch []Trigger `json:"watch,omitempty"` } +type WatchAction string + const ( - WatchActionSync = "sync" - WatchActionRebuild = "rebuild" + WatchActionSync WatchAction = "sync" + WatchActionRebuild WatchAction = "rebuild" ) type Trigger struct { @@ -57,44 +59,34 @@ type Trigger struct { Ignore []string `json:"ignore,omitempty"` } -const quietPeriod = 2 * time.Second +const quietPeriod = 500 * time.Millisecond // fileEvent contains the Compose service and modified host system path. type fileEvent struct { - // Service that the file event is for. - Service string - // HostPath that was created/modified/deleted outside the container. - // - // This is the path as seen from the user's perspective, e.g. - // - C:\Users\moby\Documents\hello-world\main.go - // - /Users/moby/Documents/hello-world/main.go - HostPath string + sync.PathMapping + Action WatchAction } func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint: gocyclo - needRebuild := make(chan fileEvent) - needSync := make(chan sync.PathMapping) - _, err := s.prepareProjectForBuild(project, nil) if err != nil { return err } + var syncer sync.Syncer + if useTar, _ := strconv.ParseBool(os.Getenv("COMPOSE_EXPERIMENTAL_WATCH_TAR")); useTar { + syncer = sync.NewTar(project.Name, tarDockerClient{s: s}) + } else { + syncer = sync.NewDockerCopy(project.Name, s, s.stdinfo()) + } - eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - clock := clockwork.NewRealClock() - debounce(ctx, clock, quietPeriod, needRebuild, s.makeRebuildFn(ctx, project)) - return nil - }) - - eg.Go(s.makeSyncFn(ctx, project, needSync)) - - ss, err := project.GetServices(services...) - if err != nil { + if err := project.ForServices(services); err != nil { return err } + + eg, ctx := errgroup.WithContext(ctx) watching := false - for _, service := range ss { + for i := range project.Services { + service := project.Services[i] config, err := loadDevelopmentConfig(service, project) if err != nil { return err @@ -118,7 +110,10 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv continue } - name := service.Name + // set the service to always be built - watch triggers `Up()` when it receives a rebuild event + service.PullPolicy = types.PullPolicyBuild + project.Services[i] = service + dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context) if err != nil { return err @@ -160,7 +155,7 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv eg.Go(func() error { defer watcher.Close() //nolint:errcheck - return s.watch(ctx, name, watcher, config.Watch, needSync, needRebuild) + return s.watch(ctx, project, service.Name, watcher, syncer, config.Watch) }) } @@ -171,7 +166,17 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv return eg.Wait() } -func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan sync.PathMapping, needRebuild chan fileEvent) error { +func (s *composeService) watch( + ctx context.Context, + project *types.Project, + name string, + watcher watch.Notify, + syncer sync.Syncer, + triggers []Trigger, +) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ignores := make([]watch.PathMatcher, len(triggers)) for i, trigger := range triggers { ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore) @@ -181,62 +186,82 @@ func (s *composeService) watch(ctx context.Context, name string, watcher watch.N ignores[i] = ignore } -WATCH: + events := make(chan fileEvent) + batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events) + go func() { + for { + select { + case <-ctx.Done(): + return + case batch := <-batchEvents: + start := time.Now() + logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch)) + if err := s.handleWatchBatch(ctx, project, name, batch, syncer); err != nil { + logrus.Warnf("Error handling changed files for service %s: %v", name, err) + } + logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]", + name, time.Since(start), len(batch)) + } + } + }() + for { select { case <-ctx.Done(): return nil - case event := <-watcher.Events(): - hostPath := event.Path() - - for i, trigger := range triggers { - logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path) - if watch.IsChild(trigger.Path, hostPath) { - match, err := ignores[i].Matches(hostPath) - if err != nil { - logrus.Warnf("error ignore matching %q: %v", hostPath, err) - return err - } - - if match { - logrus.Debugf("%s is matching ignore pattern", hostPath) - continue - } - - logrus.Infof("change for %q", hostPath) - fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath) - - switch trigger.Action { - case WatchActionSync: - logrus.Debugf("modified file %s triggered sync", hostPath) - rel, err := filepath.Rel(trigger.Path, hostPath) - if err != nil { - return err - } - needSync <- sync.PathMapping{ - Service: name, - HostPath: hostPath, - // always use Unix-style paths for inside the container - ContainerPath: path.Join(trigger.Target, rel), - } - case WatchActionRebuild: - logrus.Debugf("modified file %s requires image to be rebuilt", hostPath) - needRebuild <- fileEvent{ - HostPath: hostPath, - Service: name, - } - default: - return fmt.Errorf("watch action %q is not supported", trigger) - } - continue WATCH - } - } case err := <-watcher.Errors(): return err + case event := <-watcher.Events(): + hostPath := event.Path() + for i, trigger := range triggers { + logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path) + if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil { + events <- *fileEvent + } + } } } } +// maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore +// rules. +// +// Any errors are logged as warnings and nil (no file event) is returned. +func maybeFileEvent(trigger Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent { + if !watch.IsChild(trigger.Path, hostPath) { + return nil + } + isIgnored, err := ignore.Matches(hostPath) + if err != nil { + logrus.Warnf("error ignore matching %q: %v", hostPath, err) + return nil + } + + if isIgnored { + logrus.Debugf("%s is matching ignore pattern", hostPath) + return nil + } + + var containerPath string + if trigger.Target != "" { + rel, err := filepath.Rel(trigger.Path, hostPath) + if err != nil { + logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err) + return nil + } + // always use Unix-style paths for inside the container + containerPath = path.Join(trigger.Target, rel) + } + + return &fileEvent{ + Action: WatchAction(trigger.Action), + PathMapping: sync.PathMapping{ + HostPath: hostPath, + ContainerPath: containerPath, + }, + } +} + func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*DevelopmentConfig, error) { var config DevelopmentConfig y, ok := service.Extensions["x-develop"] @@ -265,7 +290,7 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) return nil, errors.New("watch rules MUST define a path") } - if trigger.Action == WatchActionRebuild && service.Build == nil { + if trigger.Action == string(WatchActionRebuild) && service.Build == nil { return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name) } @@ -274,98 +299,54 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) return &config, nil } -func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) { - for i, service := range project.Services { - service.PullPolicy = types.PullPolicyBuild - project.Services[i] = service - } - return func(services rebuildServices) { - serviceNames := make([]string, 0, len(services)) - allPaths := make(utils.Set[string]) - for serviceName, paths := range services { - serviceNames = append(serviceNames, serviceName) - for p := range paths { - allPaths.Add(p) +// batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned +// channel. +// +// The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel. +func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent { + out := make(chan []fileEvent) + go func() { + defer close(out) + seen := make(map[fileEvent]time.Time) + flushEvents := func() { + if len(seen) == 0 { + return } + events := make([]fileEvent, 0, len(seen)) + for e := range seen { + events = append(events, e) + } + // sort batch by oldest -> newest + // (if an event is seen > 1 per batch, it gets the latest timestamp) + sort.SliceStable(events, func(i, j int) bool { + x := events[i] + y := events[j] + return seen[x].Before(seen[y]) + }) + out <- events + seen = make(map[fileEvent]time.Time) } - fmt.Fprintf( - s.stdinfo(), - "Rebuilding %s after changes were detected:%s\n", - strings.Join(serviceNames, ", "), - strings.Join(append([]string{""}, allPaths.Elements()...), "\n - "), - ) - err := s.Up(ctx, project, api.UpOptions{ - Create: api.CreateOptions{ - Services: serviceNames, - Inherit: true, - }, - Start: api.StartOptions{ - Services: serviceNames, - Project: project, - }, - }) - if err != nil { - fmt.Fprintf(s.stderr(), "Application failed to start after update\n") - } - } -} - -func (s *composeService) makeSyncFn( - ctx context.Context, - project *types.Project, - needSync <-chan sync.PathMapping, -) func() error { - var syncer sync.Syncer - if useTar, _ := strconv.ParseBool(os.Getenv("COMPOSE_EXPERIMENTAL_WATCH_TAR")); useTar { - syncer = sync.NewTar(project.Name, tarDockerClient{s: s}) - } else { - syncer = sync.NewDockerCopy(project.Name, s, s.stdinfo()) - } - - return func() error { + t := clock.NewTicker(delay) + defer t.Stop() for { select { case <-ctx.Done(): - return nil - case op := <-needSync: - service, err := project.GetService(op.Service) - if err != nil { - return err - } - if err := syncer.Sync(ctx, service, []sync.PathMapping{op}); err != nil { - return err + return + case <-t.Chan(): + flushEvents() + case e, ok := <-input: + if !ok { + // input channel was closed + flushEvents() + return } + seen[e] = time.Now() + t.Reset(delay) } } - } -} - -type rebuildServices map[string]utils.Set[string] - -func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent, fn func(services rebuildServices)) { - services := make(rebuildServices) - t := clock.NewTimer(delay) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.Chan(): - if len(services) > 0 { - go fn(services) - services = make(rebuildServices) - } - case e := <-input: - t.Reset(delay) - svc, ok := services[e.Service] - if !ok { - svc = make(utils.Set[string]) - services[e.Service] = svc - } - svc.Add(e.HostPath) - } - } + }() + return out } func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool { @@ -440,8 +421,85 @@ func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []str if err != nil { return err } + if execResult.Running { + return errors.New("process still running") + } if execResult.ExitCode != 0 { return fmt.Errorf("exit code %d", execResult.ExitCode) } return nil } + +func (s *composeService) handleWatchBatch( + ctx context.Context, + project *types.Project, + serviceName string, + batch []fileEvent, + syncer sync.Syncer, +) error { + pathMappings := make([]sync.PathMapping, len(batch)) + for i := range batch { + if batch[i].Action == WatchActionRebuild { + fmt.Fprintf( + s.stdinfo(), + "Rebuilding %s after changes were detected:%s\n", + serviceName, + strings.Join(append([]string{""}, batch[i].HostPath), "\n - "), + ) + err := s.Up(ctx, project, api.UpOptions{ + Create: api.CreateOptions{ + Services: []string{serviceName}, + Inherit: true, + }, + Start: api.StartOptions{ + Services: []string{serviceName}, + Project: project, + }, + }) + if err != nil { + fmt.Fprintf(s.stderr(), "Application failed to start after update\n") + } + return nil + } + pathMappings[i] = batch[i].PathMapping + } + + writeWatchSyncMessage(s.stdinfo(), serviceName, pathMappings) + + service, err := project.GetService(serviceName) + if err != nil { + return err + } + if err := syncer.Sync(ctx, service, pathMappings); err != nil { + return err + } + return nil +} + +// writeWatchSyncMessage prints out a message about the sync for the changed paths. +func writeWatchSyncMessage(w io.Writer, serviceName string, pathMappings []sync.PathMapping) { + const maxPathsToShow = 10 + if len(pathMappings) <= maxPathsToShow || logrus.IsLevelEnabled(logrus.DebugLevel) { + hostPathsToSync := make([]string, len(pathMappings)) + for i := range pathMappings { + hostPathsToSync[i] = pathMappings[i].HostPath + } + fmt.Fprintf( + w, + "Syncing %s after changes were detected:%s\n", + serviceName, + strings.Join(append([]string{""}, hostPathsToSync...), "\n - "), + ) + } else { + hostPathsToSync := make([]string, len(pathMappings)) + for i := range pathMappings { + hostPathsToSync[i] = pathMappings[i].HostPath + } + fmt.Fprintf( + w, + "Syncing %s after %d changes were detected\n", + serviceName, + len(pathMappings), + ) + } +} diff --git a/pkg/compose/watch_test.go b/pkg/compose/watch_test.go index 3f359f7d4..a6bdc7be0 100644 --- a/pkg/compose/watch_test.go +++ b/pkg/compose/watch_test.go @@ -16,47 +16,60 @@ package compose import ( "context" + "os" "testing" "time" + "github.com/compose-spec/compose-go/types" + "github.com/docker/compose/v2/pkg/mocks" + moby "github.com/docker/docker/api/types" + "github.com/golang/mock/gomock" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + "github.com/docker/compose/v2/internal/sync" - "github.com/docker/cli/cli/command" "github.com/docker/compose/v2/pkg/watch" - "github.com/jonboulle/clockwork" - "golang.org/x/sync/errgroup" "gotest.tools/v3/assert" ) -func Test_debounce(t *testing.T) { +func TestDebounceBatching(t *testing.T) { ch := make(chan fileEvent) - var ( - ran int - got []string - ) clock := clockwork.NewFakeClock() ctx, stop := context.WithCancel(context.Background()) t.Cleanup(stop) - eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - debounce(ctx, clock, quietPeriod, ch, func(services rebuildServices) { - for svc := range services { - got = append(got, svc) - } - ran++ - stop() - }) - return nil - }) + + eventBatchCh := batchDebounceEvents(ctx, clock, quietPeriod, ch) for i := 0; i < 100; i++ { - ch <- fileEvent{Service: "test"} + var action WatchAction = "a" + if i%2 == 0 { + action = "b" + } + ch <- fileEvent{Action: action} } - assert.Equal(t, ran, 0) + // we sent 100 events + the debouncer + clock.BlockUntil(101) clock.Advance(quietPeriod) - err := eg.Wait() - assert.NilError(t, err) - assert.Equal(t, ran, 1) - assert.DeepEqual(t, got, []string{"test"}) + select { + case batch := <-eventBatchCh: + require.ElementsMatch(t, batch, []fileEvent{ + {Action: "a"}, + {Action: "b"}, + }) + case <-time.After(50 * time.Millisecond): + t.Fatal("timed out waiting for events") + } + clock.BlockUntil(1) + clock.Advance(quietPeriod) + + // there should only be a single batch + select { + case batch := <-eventBatchCh: + t.Fatalf("unexpected events: %v", batch) + case <-time.After(50 * time.Millisecond): + // channel is empty + } } type testWatcher struct { @@ -80,73 +93,106 @@ func (t testWatcher) Errors() chan error { return t.errors } -func Test_sync(t *testing.T) { - needSync := make(chan sync.PathMapping) - needRebuild := make(chan fileEvent) - ctx, cancelFunc := context.WithCancel(context.TODO()) - defer cancelFunc() +func TestWatch_Sync(t *testing.T) { + mockCtrl := gomock.NewController(t) + cli := mocks.NewMockCli(mockCtrl) + cli.EXPECT().Err().Return(os.Stderr).AnyTimes() + apiClient := mocks.NewMockAPIClient(mockCtrl) + apiClient.EXPECT().ContainerList(gomock.Any(), gomock.Any()).Return([]moby.Container{ + testContainer("test", "123", false), + }, nil).AnyTimes() + cli.EXPECT().Client().Return(apiClient).AnyTimes() - run := func() watch.Notify { - watcher := testWatcher{ - events: make(chan watch.FileEvent, 1), - errors: make(chan error), - } + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) - go func() { - cli, err := command.NewDockerCli() - assert.NilError(t, err) - - service := composeService{ - dockerCli: cli, - } - err = service.watch(ctx, "test", watcher, []Trigger{ - { - Path: "/src", - Action: "sync", - Target: "/work", - Ignore: []string{"ignore"}, - }, - { - Path: "/", - Action: "rebuild", - }, - }, needSync, needRebuild) - assert.NilError(t, err) - }() - return watcher + proj := types.Project{ + Services: []types.ServiceConfig{ + { + Name: "test", + }, + }, } - t.Run("synchronize file", func(t *testing.T) { - watcher := run() - watcher.Events() <- watch.NewFileEvent("/src/changed") - select { - case actual := <-needSync: - assert.DeepEqual(t, sync.PathMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual) - case <-time.After(100 * time.Millisecond): - t.Error("timeout") - } - }) + watcher := testWatcher{ + events: make(chan watch.FileEvent), + errors: make(chan error), + } - t.Run("ignore", func(t *testing.T) { - watcher := run() - watcher.Events() <- watch.NewFileEvent("/src/ignore") - select { - case <-needSync: - t.Error("file event should have been ignored") - case <-time.After(100 * time.Millisecond): - // expected + syncer := newFakeSyncer() + clock := clockwork.NewFakeClock() + go func() { + service := composeService{ + dockerCli: cli, + clock: clock, } - }) + err := service.watch(ctx, &proj, "test", watcher, syncer, []Trigger{ + { + Path: "/sync", + Action: "sync", + Target: "/work", + Ignore: []string{"ignore"}, + }, + { + Path: "/rebuild", + Action: "rebuild", + }, + }) + assert.NilError(t, err) + }() - t.Run("rebuild", func(t *testing.T) { - watcher := run() - watcher.Events() <- watch.NewFileEvent("/dependencies.yaml") - select { - case event := <-needRebuild: - assert.Equal(t, "test", event.Service) - case <-time.After(100 * time.Millisecond): - t.Error("timeout") - } - }) + watcher.Events() <- watch.NewFileEvent("/sync/changed") + watcher.Events() <- watch.NewFileEvent("/sync/changed/sub") + clock.BlockUntil(3) + clock.Advance(quietPeriod) + select { + case actual := <-syncer.synced: + require.ElementsMatch(t, []sync.PathMapping{ + {HostPath: "/sync/changed", ContainerPath: "/work/changed"}, + {HostPath: "/sync/changed/sub", ContainerPath: "/work/changed/sub"}, + }, actual) + case <-time.After(100 * time.Millisecond): + t.Error("timeout") + } + watcher.Events() <- watch.NewFileEvent("/sync/ignore") + watcher.Events() <- watch.NewFileEvent("/sync/ignore/sub") + watcher.Events() <- watch.NewFileEvent("/sync/changed") + clock.BlockUntil(4) + clock.Advance(quietPeriod) + select { + case actual := <-syncer.synced: + require.ElementsMatch(t, []sync.PathMapping{ + {HostPath: "/sync/changed", ContainerPath: "/work/changed"}, + }, actual) + case <-time.After(100 * time.Millisecond): + t.Error("timed out waiting for events") + } + + watcher.Events() <- watch.NewFileEvent("/rebuild") + watcher.Events() <- watch.NewFileEvent("/sync/changed") + clock.BlockUntil(4) + clock.Advance(quietPeriod) + select { + case batch := <-syncer.synced: + t.Fatalf("received unexpected events: %v", batch) + case <-time.After(100 * time.Millisecond): + // expected + } + // TODO: there's not a great way to assert that the rebuild attempt happened +} + +type fakeSyncer struct { + synced chan []sync.PathMapping +} + +func newFakeSyncer() *fakeSyncer { + return &fakeSyncer{ + synced: make(chan []sync.PathMapping), + } +} + +func (f *fakeSyncer) Sync(_ context.Context, _ types.ServiceConfig, paths []sync.PathMapping) error { + f.synced <- paths + return nil } diff --git a/pkg/e2e/watch_test.go b/pkg/e2e/watch_test.go index 38ce73103..9d609724b 100644 --- a/pkg/e2e/watch_test.go +++ b/pkg/e2e/watch_test.go @@ -23,6 +23,7 @@ import ( "strings" "sync/atomic" "testing" + "time" "github.com/distribution/distribution/v3/uuid" "github.com/stretchr/testify/require" @@ -132,7 +133,7 @@ func doTest(t *testing.T, svcName string, tarSync bool) { poll.WaitOn(t, func(t poll.LogT) poll.Result { writeDataFile("hello.txt", "hello world") return checkFileContents("/app/data/hello.txt", "hello world")(t) - }) + }, poll.WithDelay(time.Second)) t.Logf("Modifying file contents") writeDataFile("hello.txt", "hello watch")