diff --git a/cmd/compose/watch.go b/cmd/compose/watch.go index b6e38c307..9fe2293f4 100644 --- a/cmd/compose/watch.go +++ b/cmd/compose/watch.go @@ -117,9 +117,10 @@ func runWatch(ctx context.Context, dockerCli command.Cli, backend api.Service, w } consumer := formatter.NewLogConsumer(ctx, dockerCli.Out(), dockerCli.Err(), false, false, false) - return backend.Watch(ctx, project, services, api.WatchOptions{ - Build: &build, - LogTo: consumer, - Prune: watchOpts.prune, + return backend.Watch(ctx, project, api.WatchOptions{ + Build: &build, + LogTo: consumer, + Prune: watchOpts.prune, + Services: services, }) } diff --git a/cmd/formatter/shortcut.go b/cmd/formatter/shortcut.go index 126d4509f..b4098f10b 100644 --- a/cmd/formatter/shortcut.go +++ b/cmd/formatter/shortcut.go @@ -29,9 +29,7 @@ import ( "github.com/compose-spec/compose-go/v2/types" "github.com/docker/compose/v2/internal/tracing" "github.com/docker/compose/v2/pkg/api" - "github.com/docker/compose/v2/pkg/watch" "github.com/eiannone/keyboard" - "github.com/hashicorp/go-multierror" "github.com/skratchdot/open-golang/open" ) @@ -71,26 +69,13 @@ func (ke *KeyboardError) error() string { } type KeyboardWatch struct { - Watcher watch.Notify Watching bool - WatchFn func(ctx context.Context, doneCh chan bool, project *types.Project, services []string, options api.WatchOptions) error - Ctx context.Context - Cancel context.CancelFunc + Watcher Toggle } -func (kw *KeyboardWatch) isWatching() bool { - return kw.Watching -} - -func (kw *KeyboardWatch) switchWatching() { - kw.Watching = !kw.Watching -} - -func (kw *KeyboardWatch) newContext(ctx context.Context) context.CancelFunc { - ctx, cancel := context.WithCancel(ctx) - kw.Ctx = ctx - kw.Cancel = cancel - return cancel +type Toggle interface { + Start(context.Context) error + Stop() error } type KEYBOARD_LOG_LEVEL int @@ -110,31 +95,21 @@ type LogKeyboard struct { signalChannel chan<- os.Signal } -var ( - KeyboardManager *LogKeyboard - eg multierror.Group -) +// FIXME(ndeloof) we should avoid use of such a global reference. see use in logConsumer +var KeyboardManager *LogKeyboard -func NewKeyboardManager(ctx context.Context, isDockerDesktopActive, isWatchConfigured bool, - sc chan<- os.Signal, - watchFn func(ctx context.Context, - doneCh chan bool, - project *types.Project, - services []string, - options api.WatchOptions, - ) error, -) { - km := LogKeyboard{} - km.IsDockerDesktopActive = isDockerDesktopActive - km.IsWatchConfigured = isWatchConfigured - km.logLevel = INFO - - km.Watch.Watching = false - km.Watch.WatchFn = watchFn - - km.signalChannel = sc - - KeyboardManager = &km +func NewKeyboardManager(isDockerDesktopActive bool, sc chan<- os.Signal, w bool, watcher Toggle) *LogKeyboard { + KeyboardManager = &LogKeyboard{ + Watch: KeyboardWatch{ + Watching: w, + Watcher: watcher, + }, + IsDockerDesktopActive: isDockerDesktopActive, + IsWatchConfigured: true, + logLevel: INFO, + signalChannel: sc, + } + return KeyboardManager } func (lk *LogKeyboard) ClearKeyboardInfo() { @@ -233,48 +208,51 @@ func (lk *LogKeyboard) openDockerDesktop(ctx context.Context, project *types.Pro if !lk.IsDockerDesktopActive { return } - eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/gui", tracing.SpanOptions{}, - func(ctx context.Context) error { - link := fmt.Sprintf("docker-desktop://dashboard/apps/%s", project.Name) - err := open.Run(link) - if err != nil { - err = fmt.Errorf("could not open Docker Desktop") - lk.keyboardError("View", err) - } - return err - }), - ) + go func() { + _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/gui", tracing.SpanOptions{}, + func(ctx context.Context) error { + link := fmt.Sprintf("docker-desktop://dashboard/apps/%s", project.Name) + err := open.Run(link) + if err != nil { + err = fmt.Errorf("could not open Docker Desktop") + lk.keyboardError("View", err) + } + return err + })() + }() } func (lk *LogKeyboard) openDDComposeUI(ctx context.Context, project *types.Project) { if !lk.IsDockerDesktopActive { return } - eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/gui/composeview", tracing.SpanOptions{}, - func(ctx context.Context) error { - link := fmt.Sprintf("docker-desktop://dashboard/docker-compose/%s", project.Name) - err := open.Run(link) - if err != nil { - err = fmt.Errorf("could not open Docker Desktop Compose UI") - lk.keyboardError("View Config", err) - } - return err - }), - ) + go func() { + _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/gui/composeview", tracing.SpanOptions{}, + func(ctx context.Context) error { + link := fmt.Sprintf("docker-desktop://dashboard/docker-compose/%s", project.Name) + err := open.Run(link) + if err != nil { + err = fmt.Errorf("could not open Docker Desktop Compose UI") + lk.keyboardError("View Config", err) + } + return err + })() + }() } func (lk *LogKeyboard) openDDWatchDocs(ctx context.Context, project *types.Project) { - eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/gui/watch", tracing.SpanOptions{}, - func(ctx context.Context) error { - link := fmt.Sprintf("docker-desktop://dashboard/docker-compose/%s/watch", project.Name) - err := open.Run(link) - if err != nil { - err = fmt.Errorf("could not open Docker Desktop Compose UI") - lk.keyboardError("Watch Docs", err) - } - return err - }), - ) + go func() { + _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/gui/watch", tracing.SpanOptions{}, + func(ctx context.Context) error { + link := fmt.Sprintf("docker-desktop://dashboard/docker-compose/%s/watch", project.Name) + err := open.Run(link) + if err != nil { + err = fmt.Errorf("could not open Docker Desktop Compose UI") + lk.keyboardError("Watch Docs", err) + } + return err + })() + }() } func (lk *LogKeyboard) keyboardError(prefix string, err error) { @@ -288,39 +266,34 @@ func (lk *LogKeyboard) keyboardError(prefix string, err error) { }() } -func (lk *LogKeyboard) StartWatch(ctx context.Context, doneCh chan bool, project *types.Project, options api.UpOptions) { +func (lk *LogKeyboard) ToggleWatch(ctx context.Context, options api.UpOptions) { if !lk.IsWatchConfigured { return } - lk.Watch.switchWatching() - if !lk.Watch.isWatching() { - lk.Watch.Cancel() + if lk.Watch.Watching { + err := lk.Watch.Watcher.Stop() + if err != nil { + options.Start.Attach.Err(api.WatchLogger, err.Error()) + } else { + lk.Watch.Watching = false + } } else { - eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/watch", tracing.SpanOptions{}, - func(ctx context.Context) error { - if options.Create.Build == nil { - err := fmt.Errorf("cannot run watch mode with flag --no-build") - lk.keyboardError("Watch", err) + go func() { + _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/watch", tracing.SpanOptions{}, + func(ctx context.Context) error { + err := lk.Watch.Watcher.Start(ctx) + if err != nil { + options.Start.Attach.Err(api.WatchLogger, err.Error()) + } else { + lk.Watch.Watching = true + } return err - } - - lk.Watch.newContext(ctx) - buildOpts := *options.Create.Build - buildOpts.Quiet = true - err := lk.Watch.WatchFn(lk.Watch.Ctx, doneCh, project, options.Start.Services, api.WatchOptions{ - Build: &buildOpts, - LogTo: options.Start.Attach, - }) - if err != nil { - lk.Watch.switchWatching() - options.Start.Attach.Err(api.WatchLogger, err.Error()) - } - return err - })) + })() + }() } } -func (lk *LogKeyboard) HandleKeyEvents(event keyboard.KeyEvent, ctx context.Context, doneCh chan bool, project *types.Project, options api.UpOptions) { +func (lk *LogKeyboard) HandleKeyEvents(ctx context.Context, event keyboard.KeyEvent, project *types.Project, options api.UpOptions) { switch kRune := event.Rune; kRune { case 'v': lk.openDockerDesktop(ctx, project) @@ -331,15 +304,16 @@ func (lk *LogKeyboard) HandleKeyEvents(event keyboard.KeyEvent, ctx context.Cont lk.openDDWatchDocs(ctx, project) } // either way we mark menu/watch as an error - eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/watch", tracing.SpanOptions{}, - func(ctx context.Context) error { - err := fmt.Errorf("watch is not yet configured. Learn more: %s", ansiColor(CYAN, "https://docs.docker.com/compose/file-watch/")) - lk.keyboardError("Watch", err) - return err - })) - return + go func() { + _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/watch", tracing.SpanOptions{}, + func(ctx context.Context) error { + err := fmt.Errorf("watch is not yet configured. Learn more: %s", ansiColor(CYAN, "https://docs.docker.com/compose/file-watch/")) + lk.keyboardError("Watch", err) + return err + })() + }() } - lk.StartWatch(ctx, doneCh, project, options) + lk.ToggleWatch(ctx, options) case 'o': lk.openDDComposeUI(ctx, project) } @@ -350,10 +324,6 @@ func (lk *LogKeyboard) HandleKeyEvents(event keyboard.KeyEvent, ctx context.Cont ShowCursor() lk.logLevel = NONE - if lk.Watch.Watching && lk.Watch.Cancel != nil { - lk.Watch.Cancel() - _ = eg.Wait().ErrorOrNil() // Need to print this ? - } // will notify main thread to kill and will handle gracefully lk.signalChannel <- syscall.SIGINT case keyboard.KeyEnter: diff --git a/pkg/api/api.go b/pkg/api/api.go index 04b7f6c25..21fbbda35 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -85,7 +85,7 @@ type Service interface { // DryRunMode defines if dry run applies to the command DryRunMode(ctx context.Context, dryRun bool) (context.Context, error) // Watch services' development context and sync/notify/rebuild/restart on changes - Watch(ctx context.Context, project *types.Project, services []string, options WatchOptions) error + Watch(ctx context.Context, project *types.Project, options WatchOptions) error // Viz generates a graphviz graph of the project services Viz(ctx context.Context, project *types.Project, options VizOptions) (string, error) // Wait blocks until at least one of the services' container exits @@ -127,9 +127,10 @@ const WatchLogger = "#watch" // WatchOptions group options of the Watch API type WatchOptions struct { - Build *BuildOptions - LogTo LogConsumer - Prune bool + Build *BuildOptions + LogTo LogConsumer + Prune bool + Services []string } // BuildOptions group options of the Build API diff --git a/pkg/compose/up.go b/pkg/compose/up.go index c98570e4f..4580045c5 100644 --- a/pkg/compose/up.go +++ b/pkg/compose/up.go @@ -72,6 +72,15 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options var isTerminated atomic.Bool printer := newLogPrinter(options.Start.Attach) + var watcher *Watcher + if options.Start.Watch { + watcher, err = NewWatcher(project, options, s.watch) + if err != nil { + return err + } + } + + var navigationMenu *formatter.LogKeyboard var kEvents <-chan keyboard.KeyEvent if options.Start.NavigationMenu { kEvents, err = keyboard.GetKeys(100) @@ -80,20 +89,14 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options options.Start.NavigationMenu = false } else { defer keyboard.Close() //nolint:errcheck - isWatchConfigured := s.shouldWatch(project) isDockerDesktopActive := s.isDesktopIntegrationActive() - tracing.KeyboardMetrics(ctx, options.Start.NavigationMenu, isDockerDesktopActive, isWatchConfigured) - formatter.NewKeyboardManager(ctx, isDockerDesktopActive, isWatchConfigured, signalChan, s.watch) + tracing.KeyboardMetrics(ctx, options.Start.NavigationMenu, isDockerDesktopActive, watcher != nil) + navigationMenu = formatter.NewKeyboardManager(isDockerDesktopActive, signalChan, options.Start.Watch, watcher) } } doneCh := make(chan bool) eg.Go(func() error { - if options.Start.NavigationMenu && options.Start.Watch { - // Run watch by navigation menu, so we can interactively enable/disable - formatter.KeyboardManager.StartWatch(ctx, doneCh, project, options) - } - first := true gracefulTeardown := func() { printer.Cancel() @@ -112,6 +115,9 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options for { select { case <-doneCh: + if watcher != nil { + return watcher.Stop() + } return nil case <-ctx.Done(): if first { @@ -119,6 +125,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } case <-signalChan: if first { + keyboard.Close() //nolint:errcheck gracefulTeardown() break } @@ -137,7 +144,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options }) return nil case event := <-kEvents: - formatter.KeyboardManager.HandleKeyEvents(event, ctx, doneCh, project, options) + navigationMenu.HandleKeyEvents(ctx, event, project, options) } } }) @@ -157,15 +164,11 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options return err }) - if options.Start.Watch && !options.Start.NavigationMenu { - eg.Go(func() error { - buildOpts := *options.Create.Build - buildOpts.Quiet = true - return s.watch(ctx, doneCh, project, options.Start.Services, api.WatchOptions{ - Build: &buildOpts, - LogTo: options.Start.Attach, - }) - }) + if options.Start.Watch && watcher != nil { + err = watcher.Start(ctx) + if err != nil { + return err + } } // We use the parent context without cancellation as we manage sigterm to stop the stack diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index e89962aaa..dcc67b5a1 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -26,6 +26,7 @@ import ( "slices" "strconv" "strings" + gsync "sync" "time" "github.com/compose-spec/compose-go/v2/types" @@ -44,6 +45,68 @@ import ( "golang.org/x/sync/errgroup" ) +type WatchFunc func(ctx context.Context, project *types.Project, options api.WatchOptions) (func() error, error) + +type Watcher struct { + project *types.Project + options api.WatchOptions + watchFn WatchFunc + stopFn func() + errCh chan error +} + +func NewWatcher(project *types.Project, options api.UpOptions, w WatchFunc) (*Watcher, error) { + for i := range project.Services { + service := project.Services[i] + + if service.Develop != nil && service.Develop.Watch != nil { + build := options.Create.Build + build.Quiet = true + return &Watcher{ + project: project, + options: api.WatchOptions{ + LogTo: options.Start.Attach, + Build: build, + }, + watchFn: w, + errCh: make(chan error), + }, nil + } + } + // none of the services is eligible to watch + return nil, fmt.Errorf("none of the selected services is configured for watch, see https://docs.docker.com/compose/how-tos/file-watch/") +} + +// ensure state changes are atomic +var mx gsync.Mutex + +func (w *Watcher) Start(ctx context.Context) error { + mx.Lock() + defer mx.Unlock() + ctx, cancelFunc := context.WithCancel(ctx) + w.stopFn = cancelFunc + wait, err := w.watchFn(ctx, w.project, w.options) + if err != nil { + return err + } + go func() { + w.errCh <- wait() + }() + return nil +} + +func (w *Watcher) Stop() error { + mx.Lock() + defer mx.Unlock() + if w.stopFn == nil { + return nil + } + w.stopFn() + w.stopFn = nil + err := <-w.errCh + return err +} + // getSyncImplementation returns an appropriate sync implementation for the // project. // @@ -63,20 +126,12 @@ func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syn return sync.NewTar(project.Name, tarDockerClient{s: s}), nil } -func (s *composeService) shouldWatch(project *types.Project) bool { - var shouldWatch bool - for i := range project.Services { - service := project.Services[i] - - if service.Develop != nil && service.Develop.Watch != nil { - shouldWatch = true - } +func (s *composeService) Watch(ctx context.Context, project *types.Project, options api.WatchOptions) error { + wait, err := s.watch(ctx, project, options) + if err != nil { + return err } - return shouldWatch -} - -func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { - return s.watch(ctx, nil, project, services, options) + return wait() } type watchRule struct { @@ -127,14 +182,14 @@ func (r watchRule) Matches(event watch.FileEvent) *sync.PathMapping { } } -func (s *composeService) watch(ctx context.Context, syncChannel chan bool, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo +func (s *composeService) watch(ctx context.Context, project *types.Project, options api.WatchOptions) (func() error, error) { //nolint: gocyclo var err error - if project, err = project.WithSelectedServices(services); err != nil { - return err + if project, err = project.WithSelectedServices(options.Services); err != nil { + return nil, err } syncer, err := s.getSyncImplementation(project) if err != nil { - return err + return nil, err } eg, ctx := errgroup.WithContext(ctx) options.LogTo.Register(api.WatchLogger) @@ -146,7 +201,7 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje for serviceName, service := range project.Services { config, err := loadDevelopmentConfig(service, project) if err != nil { - return err + return nil, err } if service.Develop != nil { @@ -160,10 +215,10 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje for _, trigger := range config.Watch { if trigger.Action == types.WatchActionRebuild { if service.Build == nil { - return fmt.Errorf("can't watch service %q with action %s without a build context", service.Name, types.WatchActionRebuild) + return nil, fmt.Errorf("can't watch service %q with action %s without a build context", service.Name, types.WatchActionRebuild) } if options.Build == nil { - return fmt.Errorf("--no-build is incompatible with watch action %s in service %s", types.WatchActionRebuild, service.Name) + return nil, fmt.Errorf("--no-build is incompatible with watch action %s in service %s", types.WatchActionRebuild, service.Name) } // set the service to always be built - watch triggers `Up()` when it receives a rebuild event service.PullPolicy = types.PullPolicyBuild @@ -182,7 +237,7 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje // Need to check initial files are in container that are meant to be synched from watch action err := s.initialSync(ctx, project, service, trigger, syncer) if err != nil { - return err + return nil, err } } } @@ -191,45 +246,37 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje serviceWatchRules, err := getWatchRules(config, service) if err != nil { - return err + return nil, err } rules = append(rules, serviceWatchRules...) } if len(paths) == 0 { - return fmt.Errorf("none of the selected services is configured for watch, consider setting a 'develop' section") + return nil, fmt.Errorf("none of the selected services is configured for watch, consider setting a 'develop' section") } watcher, err := watch.NewWatcher(paths) if err != nil { - return err + return nil, err } err = watcher.Start() if err != nil { - return err + return nil, err } - defer func() { - if err := watcher.Close(); err != nil { - logrus.Debugf("Error closing watcher: %v", err) - } - }() - eg.Go(func() error { return s.watchEvents(ctx, project, options, watcher, syncer, rules) }) options.LogTo.Log(api.WatchLogger, "Watch enabled") - for { - select { - case <-ctx.Done(): - return eg.Wait() - case <-syncChannel: - options.LogTo.Log(api.WatchLogger, "Watch disabled") - return nil + return func() error { + err := eg.Wait() + if werr := watcher.Close(); werr != nil { + logrus.Debugf("Error closing Watcher: %v", werr) } - } + return err + }, nil } func getWatchRules(config *types.DevelopConfig, service types.ServiceConfig) ([]watchRule, error) { @@ -295,8 +342,13 @@ func (s *composeService) watchEvents(ctx context.Context, project *types.Project case <-ctx.Done(): options.LogTo.Log(api.WatchLogger, "Watch disabled") return nil - case err := <-watcher.Errors(): - options.LogTo.Err(api.WatchLogger, "Watch disabled with errors") + case err, open := <-watcher.Errors(): + if err != nil { + options.LogTo.Err(api.WatchLogger, "Watch disabled with errors: "+err.Error()) + } + if open { + continue + } return err case batch := <-batchEvents: start := time.Now() diff --git a/pkg/mocks/mock_docker_compose_api.go b/pkg/mocks/mock_docker_compose_api.go index 2fd3a3e72..811187ea7 100644 --- a/pkg/mocks/mock_docker_compose_api.go +++ b/pkg/mocks/mock_docker_compose_api.go @@ -513,17 +513,17 @@ func (mr *MockServiceMockRecorder) Wait(ctx, projectName, options any) *gomock.C } // Watch mocks base method. -func (m *MockService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { +func (m *MockService) Watch(ctx context.Context, project *types.Project, options api.WatchOptions) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Watch", ctx, project, services, options) + ret := m.ctrl.Call(m, "Watch", ctx, project, options) ret0, _ := ret[0].(error) return ret0 } // Watch indicates an expected call of Watch. -func (mr *MockServiceMockRecorder) Watch(ctx, project, services, options any) *gomock.Call { +func (mr *MockServiceMockRecorder) Watch(ctx, project, options any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockService)(nil).Watch), ctx, project, services, options) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockService)(nil).Watch), ctx, project, options) } // MockLogConsumer is a mock of LogConsumer interface. diff --git a/pkg/watch/watcher_darwin.go b/pkg/watch/watcher_darwin.go index 662746057..f440f501f 100644 --- a/pkg/watch/watcher_darwin.go +++ b/pkg/watch/watcher_darwin.go @@ -83,10 +83,11 @@ func (d *fseventNotify) Start() error { numberOfWatches.Add(int64(len(d.stream.Paths))) - d.stream.Start() //nolint:errcheck // FIXME(thaJeztah): should this return an error? - + err := d.stream.Start() + if err != nil { + return err + } go d.loop() - return nil }