(refactoting) Move watch logic into a dedicated Watcher type

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2025-05-22 18:04:38 +02:00 committed by Guillaume Lours
parent 0d0e12cc85
commit 9b67a48c33
7 changed files with 215 additions and 187 deletions

View File

@ -117,9 +117,10 @@ func runWatch(ctx context.Context, dockerCli command.Cli, backend api.Service, w
} }
consumer := formatter.NewLogConsumer(ctx, dockerCli.Out(), dockerCli.Err(), false, false, false) consumer := formatter.NewLogConsumer(ctx, dockerCli.Out(), dockerCli.Err(), false, false, false)
return backend.Watch(ctx, project, services, api.WatchOptions{ return backend.Watch(ctx, project, api.WatchOptions{
Build: &build, Build: &build,
LogTo: consumer, LogTo: consumer,
Prune: watchOpts.prune, Prune: watchOpts.prune,
Services: services,
}) })
} }

View File

@ -29,9 +29,7 @@ import (
"github.com/compose-spec/compose-go/v2/types" "github.com/compose-spec/compose-go/v2/types"
"github.com/docker/compose/v2/internal/tracing" "github.com/docker/compose/v2/internal/tracing"
"github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/watch"
"github.com/eiannone/keyboard" "github.com/eiannone/keyboard"
"github.com/hashicorp/go-multierror"
"github.com/skratchdot/open-golang/open" "github.com/skratchdot/open-golang/open"
) )
@ -71,26 +69,13 @@ func (ke *KeyboardError) error() string {
} }
type KeyboardWatch struct { type KeyboardWatch struct {
Watcher watch.Notify
Watching bool Watching bool
WatchFn func(ctx context.Context, doneCh chan bool, project *types.Project, services []string, options api.WatchOptions) error Watcher Toggle
Ctx context.Context
Cancel context.CancelFunc
} }
func (kw *KeyboardWatch) isWatching() bool { type Toggle interface {
return kw.Watching Start(context.Context) error
} Stop() error
func (kw *KeyboardWatch) switchWatching() {
kw.Watching = !kw.Watching
}
func (kw *KeyboardWatch) newContext(ctx context.Context) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)
kw.Ctx = ctx
kw.Cancel = cancel
return cancel
} }
type KEYBOARD_LOG_LEVEL int type KEYBOARD_LOG_LEVEL int
@ -110,31 +95,21 @@ type LogKeyboard struct {
signalChannel chan<- os.Signal signalChannel chan<- os.Signal
} }
var ( // FIXME(ndeloof) we should avoid use of such a global reference. see use in logConsumer
KeyboardManager *LogKeyboard var KeyboardManager *LogKeyboard
eg multierror.Group
)
func NewKeyboardManager(ctx context.Context, isDockerDesktopActive, isWatchConfigured bool, func NewKeyboardManager(isDockerDesktopActive bool, sc chan<- os.Signal, w bool, watcher Toggle) *LogKeyboard {
sc chan<- os.Signal, KeyboardManager = &LogKeyboard{
watchFn func(ctx context.Context, Watch: KeyboardWatch{
doneCh chan bool, Watching: w,
project *types.Project, Watcher: watcher,
services []string, },
options api.WatchOptions, IsDockerDesktopActive: isDockerDesktopActive,
) error, IsWatchConfigured: true,
) { logLevel: INFO,
km := LogKeyboard{} signalChannel: sc,
km.IsDockerDesktopActive = isDockerDesktopActive }
km.IsWatchConfigured = isWatchConfigured return KeyboardManager
km.logLevel = INFO
km.Watch.Watching = false
km.Watch.WatchFn = watchFn
km.signalChannel = sc
KeyboardManager = &km
} }
func (lk *LogKeyboard) ClearKeyboardInfo() { func (lk *LogKeyboard) ClearKeyboardInfo() {
@ -233,48 +208,51 @@ func (lk *LogKeyboard) openDockerDesktop(ctx context.Context, project *types.Pro
if !lk.IsDockerDesktopActive { if !lk.IsDockerDesktopActive {
return return
} }
eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/gui", tracing.SpanOptions{}, go func() {
func(ctx context.Context) error { _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/gui", tracing.SpanOptions{},
link := fmt.Sprintf("docker-desktop://dashboard/apps/%s", project.Name) func(ctx context.Context) error {
err := open.Run(link) link := fmt.Sprintf("docker-desktop://dashboard/apps/%s", project.Name)
if err != nil { err := open.Run(link)
err = fmt.Errorf("could not open Docker Desktop") if err != nil {
lk.keyboardError("View", err) err = fmt.Errorf("could not open Docker Desktop")
} lk.keyboardError("View", err)
return err }
}), return err
) })()
}()
} }
func (lk *LogKeyboard) openDDComposeUI(ctx context.Context, project *types.Project) { func (lk *LogKeyboard) openDDComposeUI(ctx context.Context, project *types.Project) {
if !lk.IsDockerDesktopActive { if !lk.IsDockerDesktopActive {
return return
} }
eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/gui/composeview", tracing.SpanOptions{}, go func() {
func(ctx context.Context) error { _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/gui/composeview", tracing.SpanOptions{},
link := fmt.Sprintf("docker-desktop://dashboard/docker-compose/%s", project.Name) func(ctx context.Context) error {
err := open.Run(link) link := fmt.Sprintf("docker-desktop://dashboard/docker-compose/%s", project.Name)
if err != nil { err := open.Run(link)
err = fmt.Errorf("could not open Docker Desktop Compose UI") if err != nil {
lk.keyboardError("View Config", err) err = fmt.Errorf("could not open Docker Desktop Compose UI")
} lk.keyboardError("View Config", err)
return err }
}), return err
) })()
}()
} }
func (lk *LogKeyboard) openDDWatchDocs(ctx context.Context, project *types.Project) { func (lk *LogKeyboard) openDDWatchDocs(ctx context.Context, project *types.Project) {
eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/gui/watch", tracing.SpanOptions{}, go func() {
func(ctx context.Context) error { _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/gui/watch", tracing.SpanOptions{},
link := fmt.Sprintf("docker-desktop://dashboard/docker-compose/%s/watch", project.Name) func(ctx context.Context) error {
err := open.Run(link) link := fmt.Sprintf("docker-desktop://dashboard/docker-compose/%s/watch", project.Name)
if err != nil { err := open.Run(link)
err = fmt.Errorf("could not open Docker Desktop Compose UI") if err != nil {
lk.keyboardError("Watch Docs", err) err = fmt.Errorf("could not open Docker Desktop Compose UI")
} lk.keyboardError("Watch Docs", err)
return err }
}), return err
) })()
}()
} }
func (lk *LogKeyboard) keyboardError(prefix string, err error) { func (lk *LogKeyboard) keyboardError(prefix string, err error) {
@ -288,39 +266,34 @@ func (lk *LogKeyboard) keyboardError(prefix string, err error) {
}() }()
} }
func (lk *LogKeyboard) StartWatch(ctx context.Context, doneCh chan bool, project *types.Project, options api.UpOptions) { func (lk *LogKeyboard) ToggleWatch(ctx context.Context, options api.UpOptions) {
if !lk.IsWatchConfigured { if !lk.IsWatchConfigured {
return return
} }
lk.Watch.switchWatching() if lk.Watch.Watching {
if !lk.Watch.isWatching() { err := lk.Watch.Watcher.Stop()
lk.Watch.Cancel() if err != nil {
options.Start.Attach.Err(api.WatchLogger, err.Error())
} else {
lk.Watch.Watching = false
}
} else { } else {
eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/watch", tracing.SpanOptions{}, go func() {
func(ctx context.Context) error { _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/watch", tracing.SpanOptions{},
if options.Create.Build == nil { func(ctx context.Context) error {
err := fmt.Errorf("cannot run watch mode with flag --no-build") err := lk.Watch.Watcher.Start(ctx)
lk.keyboardError("Watch", err) if err != nil {
options.Start.Attach.Err(api.WatchLogger, err.Error())
} else {
lk.Watch.Watching = true
}
return err return err
} })()
}()
lk.Watch.newContext(ctx)
buildOpts := *options.Create.Build
buildOpts.Quiet = true
err := lk.Watch.WatchFn(lk.Watch.Ctx, doneCh, project, options.Start.Services, api.WatchOptions{
Build: &buildOpts,
LogTo: options.Start.Attach,
})
if err != nil {
lk.Watch.switchWatching()
options.Start.Attach.Err(api.WatchLogger, err.Error())
}
return err
}))
} }
} }
func (lk *LogKeyboard) HandleKeyEvents(event keyboard.KeyEvent, ctx context.Context, doneCh chan bool, project *types.Project, options api.UpOptions) { func (lk *LogKeyboard) HandleKeyEvents(ctx context.Context, event keyboard.KeyEvent, project *types.Project, options api.UpOptions) {
switch kRune := event.Rune; kRune { switch kRune := event.Rune; kRune {
case 'v': case 'v':
lk.openDockerDesktop(ctx, project) lk.openDockerDesktop(ctx, project)
@ -331,15 +304,16 @@ func (lk *LogKeyboard) HandleKeyEvents(event keyboard.KeyEvent, ctx context.Cont
lk.openDDWatchDocs(ctx, project) lk.openDDWatchDocs(ctx, project)
} }
// either way we mark menu/watch as an error // either way we mark menu/watch as an error
eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "menu/watch", tracing.SpanOptions{}, go func() {
func(ctx context.Context) error { _ = tracing.EventWrapFuncForErrGroup(ctx, "menu/watch", tracing.SpanOptions{},
err := fmt.Errorf("watch is not yet configured. Learn more: %s", ansiColor(CYAN, "https://docs.docker.com/compose/file-watch/")) func(ctx context.Context) error {
lk.keyboardError("Watch", err) err := fmt.Errorf("watch is not yet configured. Learn more: %s", ansiColor(CYAN, "https://docs.docker.com/compose/file-watch/"))
return err lk.keyboardError("Watch", err)
})) return err
return })()
}()
} }
lk.StartWatch(ctx, doneCh, project, options) lk.ToggleWatch(ctx, options)
case 'o': case 'o':
lk.openDDComposeUI(ctx, project) lk.openDDComposeUI(ctx, project)
} }
@ -350,10 +324,6 @@ func (lk *LogKeyboard) HandleKeyEvents(event keyboard.KeyEvent, ctx context.Cont
ShowCursor() ShowCursor()
lk.logLevel = NONE lk.logLevel = NONE
if lk.Watch.Watching && lk.Watch.Cancel != nil {
lk.Watch.Cancel()
_ = eg.Wait().ErrorOrNil() // Need to print this ?
}
// will notify main thread to kill and will handle gracefully // will notify main thread to kill and will handle gracefully
lk.signalChannel <- syscall.SIGINT lk.signalChannel <- syscall.SIGINT
case keyboard.KeyEnter: case keyboard.KeyEnter:

View File

@ -85,7 +85,7 @@ type Service interface {
// DryRunMode defines if dry run applies to the command // DryRunMode defines if dry run applies to the command
DryRunMode(ctx context.Context, dryRun bool) (context.Context, error) DryRunMode(ctx context.Context, dryRun bool) (context.Context, error)
// Watch services' development context and sync/notify/rebuild/restart on changes // Watch services' development context and sync/notify/rebuild/restart on changes
Watch(ctx context.Context, project *types.Project, services []string, options WatchOptions) error Watch(ctx context.Context, project *types.Project, options WatchOptions) error
// Viz generates a graphviz graph of the project services // Viz generates a graphviz graph of the project services
Viz(ctx context.Context, project *types.Project, options VizOptions) (string, error) Viz(ctx context.Context, project *types.Project, options VizOptions) (string, error)
// Wait blocks until at least one of the services' container exits // Wait blocks until at least one of the services' container exits
@ -127,9 +127,10 @@ const WatchLogger = "#watch"
// WatchOptions group options of the Watch API // WatchOptions group options of the Watch API
type WatchOptions struct { type WatchOptions struct {
Build *BuildOptions Build *BuildOptions
LogTo LogConsumer LogTo LogConsumer
Prune bool Prune bool
Services []string
} }
// BuildOptions group options of the Build API // BuildOptions group options of the Build API

View File

@ -72,6 +72,15 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
var isTerminated atomic.Bool var isTerminated atomic.Bool
printer := newLogPrinter(options.Start.Attach) printer := newLogPrinter(options.Start.Attach)
var watcher *Watcher
if options.Start.Watch {
watcher, err = NewWatcher(project, options, s.watch)
if err != nil {
return err
}
}
var navigationMenu *formatter.LogKeyboard
var kEvents <-chan keyboard.KeyEvent var kEvents <-chan keyboard.KeyEvent
if options.Start.NavigationMenu { if options.Start.NavigationMenu {
kEvents, err = keyboard.GetKeys(100) kEvents, err = keyboard.GetKeys(100)
@ -80,20 +89,14 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
options.Start.NavigationMenu = false options.Start.NavigationMenu = false
} else { } else {
defer keyboard.Close() //nolint:errcheck defer keyboard.Close() //nolint:errcheck
isWatchConfigured := s.shouldWatch(project)
isDockerDesktopActive := s.isDesktopIntegrationActive() isDockerDesktopActive := s.isDesktopIntegrationActive()
tracing.KeyboardMetrics(ctx, options.Start.NavigationMenu, isDockerDesktopActive, isWatchConfigured) tracing.KeyboardMetrics(ctx, options.Start.NavigationMenu, isDockerDesktopActive, watcher != nil)
formatter.NewKeyboardManager(ctx, isDockerDesktopActive, isWatchConfigured, signalChan, s.watch) navigationMenu = formatter.NewKeyboardManager(isDockerDesktopActive, signalChan, options.Start.Watch, watcher)
} }
} }
doneCh := make(chan bool) doneCh := make(chan bool)
eg.Go(func() error { eg.Go(func() error {
if options.Start.NavigationMenu && options.Start.Watch {
// Run watch by navigation menu, so we can interactively enable/disable
formatter.KeyboardManager.StartWatch(ctx, doneCh, project, options)
}
first := true first := true
gracefulTeardown := func() { gracefulTeardown := func() {
printer.Cancel() printer.Cancel()
@ -112,6 +115,9 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
for { for {
select { select {
case <-doneCh: case <-doneCh:
if watcher != nil {
return watcher.Stop()
}
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
if first { if first {
@ -119,6 +125,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
} }
case <-signalChan: case <-signalChan:
if first { if first {
keyboard.Close() //nolint:errcheck
gracefulTeardown() gracefulTeardown()
break break
} }
@ -137,7 +144,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
}) })
return nil return nil
case event := <-kEvents: case event := <-kEvents:
formatter.KeyboardManager.HandleKeyEvents(event, ctx, doneCh, project, options) navigationMenu.HandleKeyEvents(ctx, event, project, options)
} }
} }
}) })
@ -157,15 +164,11 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return err return err
}) })
if options.Start.Watch && !options.Start.NavigationMenu { if options.Start.Watch && watcher != nil {
eg.Go(func() error { err = watcher.Start(ctx)
buildOpts := *options.Create.Build if err != nil {
buildOpts.Quiet = true return err
return s.watch(ctx, doneCh, project, options.Start.Services, api.WatchOptions{ }
Build: &buildOpts,
LogTo: options.Start.Attach,
})
})
} }
// We use the parent context without cancellation as we manage sigterm to stop the stack // We use the parent context without cancellation as we manage sigterm to stop the stack

View File

@ -26,6 +26,7 @@ import (
"slices" "slices"
"strconv" "strconv"
"strings" "strings"
gsync "sync"
"time" "time"
"github.com/compose-spec/compose-go/v2/types" "github.com/compose-spec/compose-go/v2/types"
@ -44,6 +45,68 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
type WatchFunc func(ctx context.Context, project *types.Project, options api.WatchOptions) (func() error, error)
type Watcher struct {
project *types.Project
options api.WatchOptions
watchFn WatchFunc
stopFn func()
errCh chan error
}
func NewWatcher(project *types.Project, options api.UpOptions, w WatchFunc) (*Watcher, error) {
for i := range project.Services {
service := project.Services[i]
if service.Develop != nil && service.Develop.Watch != nil {
build := options.Create.Build
build.Quiet = true
return &Watcher{
project: project,
options: api.WatchOptions{
LogTo: options.Start.Attach,
Build: build,
},
watchFn: w,
errCh: make(chan error),
}, nil
}
}
// none of the services is eligible to watch
return nil, fmt.Errorf("none of the selected services is configured for watch, see https://docs.docker.com/compose/how-tos/file-watch/")
}
// ensure state changes are atomic
var mx gsync.Mutex
func (w *Watcher) Start(ctx context.Context) error {
mx.Lock()
defer mx.Unlock()
ctx, cancelFunc := context.WithCancel(ctx)
w.stopFn = cancelFunc
wait, err := w.watchFn(ctx, w.project, w.options)
if err != nil {
return err
}
go func() {
w.errCh <- wait()
}()
return nil
}
func (w *Watcher) Stop() error {
mx.Lock()
defer mx.Unlock()
if w.stopFn == nil {
return nil
}
w.stopFn()
w.stopFn = nil
err := <-w.errCh
return err
}
// getSyncImplementation returns an appropriate sync implementation for the // getSyncImplementation returns an appropriate sync implementation for the
// project. // project.
// //
@ -63,20 +126,12 @@ func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syn
return sync.NewTar(project.Name, tarDockerClient{s: s}), nil return sync.NewTar(project.Name, tarDockerClient{s: s}), nil
} }
func (s *composeService) shouldWatch(project *types.Project) bool { func (s *composeService) Watch(ctx context.Context, project *types.Project, options api.WatchOptions) error {
var shouldWatch bool wait, err := s.watch(ctx, project, options)
for i := range project.Services { if err != nil {
service := project.Services[i] return err
if service.Develop != nil && service.Develop.Watch != nil {
shouldWatch = true
}
} }
return shouldWatch return wait()
}
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error {
return s.watch(ctx, nil, project, services, options)
} }
type watchRule struct { type watchRule struct {
@ -127,14 +182,14 @@ func (r watchRule) Matches(event watch.FileEvent) *sync.PathMapping {
} }
} }
func (s *composeService) watch(ctx context.Context, syncChannel chan bool, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo func (s *composeService) watch(ctx context.Context, project *types.Project, options api.WatchOptions) (func() error, error) { //nolint: gocyclo
var err error var err error
if project, err = project.WithSelectedServices(services); err != nil { if project, err = project.WithSelectedServices(options.Services); err != nil {
return err return nil, err
} }
syncer, err := s.getSyncImplementation(project) syncer, err := s.getSyncImplementation(project)
if err != nil { if err != nil {
return err return nil, err
} }
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
options.LogTo.Register(api.WatchLogger) options.LogTo.Register(api.WatchLogger)
@ -146,7 +201,7 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje
for serviceName, service := range project.Services { for serviceName, service := range project.Services {
config, err := loadDevelopmentConfig(service, project) config, err := loadDevelopmentConfig(service, project)
if err != nil { if err != nil {
return err return nil, err
} }
if service.Develop != nil { if service.Develop != nil {
@ -160,10 +215,10 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje
for _, trigger := range config.Watch { for _, trigger := range config.Watch {
if trigger.Action == types.WatchActionRebuild { if trigger.Action == types.WatchActionRebuild {
if service.Build == nil { if service.Build == nil {
return fmt.Errorf("can't watch service %q with action %s without a build context", service.Name, types.WatchActionRebuild) return nil, fmt.Errorf("can't watch service %q with action %s without a build context", service.Name, types.WatchActionRebuild)
} }
if options.Build == nil { if options.Build == nil {
return fmt.Errorf("--no-build is incompatible with watch action %s in service %s", types.WatchActionRebuild, service.Name) return nil, fmt.Errorf("--no-build is incompatible with watch action %s in service %s", types.WatchActionRebuild, service.Name)
} }
// set the service to always be built - watch triggers `Up()` when it receives a rebuild event // set the service to always be built - watch triggers `Up()` when it receives a rebuild event
service.PullPolicy = types.PullPolicyBuild service.PullPolicy = types.PullPolicyBuild
@ -182,7 +237,7 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje
// Need to check initial files are in container that are meant to be synched from watch action // Need to check initial files are in container that are meant to be synched from watch action
err := s.initialSync(ctx, project, service, trigger, syncer) err := s.initialSync(ctx, project, service, trigger, syncer)
if err != nil { if err != nil {
return err return nil, err
} }
} }
} }
@ -191,45 +246,37 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje
serviceWatchRules, err := getWatchRules(config, service) serviceWatchRules, err := getWatchRules(config, service)
if err != nil { if err != nil {
return err return nil, err
} }
rules = append(rules, serviceWatchRules...) rules = append(rules, serviceWatchRules...)
} }
if len(paths) == 0 { if len(paths) == 0 {
return fmt.Errorf("none of the selected services is configured for watch, consider setting a 'develop' section") return nil, fmt.Errorf("none of the selected services is configured for watch, consider setting a 'develop' section")
} }
watcher, err := watch.NewWatcher(paths) watcher, err := watch.NewWatcher(paths)
if err != nil { if err != nil {
return err return nil, err
} }
err = watcher.Start() err = watcher.Start()
if err != nil { if err != nil {
return err return nil, err
} }
defer func() {
if err := watcher.Close(); err != nil {
logrus.Debugf("Error closing watcher: %v", err)
}
}()
eg.Go(func() error { eg.Go(func() error {
return s.watchEvents(ctx, project, options, watcher, syncer, rules) return s.watchEvents(ctx, project, options, watcher, syncer, rules)
}) })
options.LogTo.Log(api.WatchLogger, "Watch enabled") options.LogTo.Log(api.WatchLogger, "Watch enabled")
for { return func() error {
select { err := eg.Wait()
case <-ctx.Done(): if werr := watcher.Close(); werr != nil {
return eg.Wait() logrus.Debugf("Error closing Watcher: %v", werr)
case <-syncChannel:
options.LogTo.Log(api.WatchLogger, "Watch disabled")
return nil
} }
} return err
}, nil
} }
func getWatchRules(config *types.DevelopConfig, service types.ServiceConfig) ([]watchRule, error) { func getWatchRules(config *types.DevelopConfig, service types.ServiceConfig) ([]watchRule, error) {
@ -295,8 +342,13 @@ func (s *composeService) watchEvents(ctx context.Context, project *types.Project
case <-ctx.Done(): case <-ctx.Done():
options.LogTo.Log(api.WatchLogger, "Watch disabled") options.LogTo.Log(api.WatchLogger, "Watch disabled")
return nil return nil
case err := <-watcher.Errors(): case err, open := <-watcher.Errors():
options.LogTo.Err(api.WatchLogger, "Watch disabled with errors") if err != nil {
options.LogTo.Err(api.WatchLogger, "Watch disabled with errors: "+err.Error())
}
if open {
continue
}
return err return err
case batch := <-batchEvents: case batch := <-batchEvents:
start := time.Now() start := time.Now()

View File

@ -513,17 +513,17 @@ func (mr *MockServiceMockRecorder) Wait(ctx, projectName, options any) *gomock.C
} }
// Watch mocks base method. // Watch mocks base method.
func (m *MockService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { func (m *MockService) Watch(ctx context.Context, project *types.Project, options api.WatchOptions) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Watch", ctx, project, services, options) ret := m.ctrl.Call(m, "Watch", ctx, project, options)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// Watch indicates an expected call of Watch. // Watch indicates an expected call of Watch.
func (mr *MockServiceMockRecorder) Watch(ctx, project, services, options any) *gomock.Call { func (mr *MockServiceMockRecorder) Watch(ctx, project, options any) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockService)(nil).Watch), ctx, project, services, options) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockService)(nil).Watch), ctx, project, options)
} }
// MockLogConsumer is a mock of LogConsumer interface. // MockLogConsumer is a mock of LogConsumer interface.

View File

@ -83,10 +83,11 @@ func (d *fseventNotify) Start() error {
numberOfWatches.Add(int64(len(d.stream.Paths))) numberOfWatches.Add(int64(len(d.stream.Paths)))
d.stream.Start() //nolint:errcheck // FIXME(thaJeztah): should this return an error? err := d.stream.Start()
if err != nil {
return err
}
go d.loop() go d.loop()
return nil return nil
} }