manage watch applied to mulitple services

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2025-01-15 12:24:39 +01:00 committed by Guillaume Lours
parent 52578c0998
commit ed10804e0f
11 changed files with 389 additions and 493 deletions

View File

@ -16,8 +16,6 @@ package sync
import ( import (
"context" "context"
"github.com/compose-spec/compose-go/v2/types"
) )
// PathMapping contains the Compose service and modified host system path. // PathMapping contains the Compose service and modified host system path.
@ -38,5 +36,5 @@ type PathMapping struct {
} }
type Syncer interface { type Syncer interface {
Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error Sync(ctx context.Context, service string, paths []*PathMapping) error
} }

View File

@ -32,7 +32,6 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/compose-spec/compose-go/v2/types"
moby "github.com/docker/docker/api/types" moby "github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/archive"
) )
@ -65,8 +64,8 @@ func NewTar(projectName string, client LowLevelClient) *Tar {
} }
} }
func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error { func (t *Tar) Sync(ctx context.Context, service string, paths []*PathMapping) error {
containers, err := t.client.ContainersForService(ctx, t.projectName, service.Name) containers, err := t.client.ContainersForService(ctx, t.projectName, service)
if err != nil { if err != nil {
return err return err
} }
@ -77,7 +76,7 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat
if _, err := os.Stat(p.HostPath); err != nil && errors.Is(err, fs.ErrNotExist) { if _, err := os.Stat(p.HostPath); err != nil && errors.Is(err, fs.ErrNotExist) {
pathsToDelete = append(pathsToDelete, p.ContainerPath) pathsToDelete = append(pathsToDelete, p.ContainerPath)
} else { } else {
pathsToCopy = append(pathsToCopy, p) pathsToCopy = append(pathsToCopy, *p)
} }
} }

View File

@ -23,11 +23,13 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/compose-spec/compose-go/v2/types" "github.com/compose-spec/compose-go/v2/types"
"github.com/compose-spec/compose-go/v2/utils"
ccli "github.com/docker/cli/cli/command/container" ccli "github.com/docker/cli/cli/command/container"
pathutil "github.com/docker/compose/v2/internal/paths" pathutil "github.com/docker/compose/v2/internal/paths"
"github.com/docker/compose/v2/internal/sync" "github.com/docker/compose/v2/internal/sync"
@ -37,20 +39,11 @@ import (
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/image"
"github.com/jonboulle/clockwork"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
const quietPeriod = 500 * time.Millisecond
// fileEvent contains the Compose service and modified host system path.
type fileEvent struct {
sync.PathMapping
Trigger types.Trigger
}
// getSyncImplementation returns an appropriate sync implementation for the // getSyncImplementation returns an appropriate sync implementation for the
// project. // project.
// //
@ -86,6 +79,44 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
return s.watch(ctx, nil, project, services, options) return s.watch(ctx, nil, project, services, options)
} }
type watchRule struct {
types.Trigger
ignore watch.PathMatcher
service string
}
func (r watchRule) Matches(event watch.FileEvent) *sync.PathMapping {
hostPath := string(event)
if !pathutil.IsChild(r.Path, hostPath) {
return nil
}
isIgnored, err := r.ignore.Matches(hostPath)
if err != nil {
logrus.Warnf("error ignore matching %q: %v", hostPath, err)
return nil
}
if isIgnored {
logrus.Debugf("%s is matching ignore pattern", hostPath)
return nil
}
var containerPath string
if r.Target != "" {
rel, err := filepath.Rel(r.Path, hostPath)
if err != nil {
logrus.Warnf("error making %s relative to %s: %v", hostPath, r.Path, err)
return nil
}
// always use Unix-style paths for inside the container
containerPath = path.Join(r.Target, filepath.ToSlash(rel))
}
return &sync.PathMapping{
HostPath: hostPath,
ContainerPath: containerPath,
}
}
func (s *composeService) watch(ctx context.Context, syncChannel chan bool, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo func (s *composeService) watch(ctx context.Context, syncChannel chan bool, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
var err error var err error
if project, err = project.WithSelectedServices(services); err != nil { if project, err = project.WithSelectedServices(services); err != nil {
@ -96,10 +127,13 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje
return err return err
} }
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
watching := false
options.LogTo.Register(api.WatchLogger) options.LogTo.Register(api.WatchLogger)
for i := range project.Services {
service := project.Services[i] var (
rules []watchRule
paths []string
)
for serviceName, service := range project.Services {
config, err := loadDevelopmentConfig(service, project) config, err := loadDevelopmentConfig(service, project)
if err != nil { if err != nil {
return err return err
@ -123,29 +157,10 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje
} }
// set the service to always be built - watch triggers `Up()` when it receives a rebuild event // set the service to always be built - watch triggers `Up()` when it receives a rebuild event
service.PullPolicy = types.PullPolicyBuild service.PullPolicy = types.PullPolicyBuild
project.Services[i] = service project.Services[serviceName] = service
} }
} }
dockerIgnores, err := watch.LoadDockerIgnore(service.Build)
if err != nil {
return err
}
// add a hardcoded set of ignores on top of what came from .dockerignore
// some of this should likely be configurable (e.g. there could be cases
// where you want `.git` to be synced) but this is suitable for now
dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
if err != nil {
return err
}
ignore := watch.NewCompositeMatcher(
dockerIgnores,
watch.EphemeralPathMatcher(),
dotGitIgnore,
)
var paths, pathLogs []string
for _, trigger := range config.Watch { for _, trigger := range config.Watch {
if isSync(trigger) && checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) { if isSync(trigger) && checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path) logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path)
@ -155,42 +170,45 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje
success, err := trigger.Extensions.Get("x-initialSync", &initialSync) success, err := trigger.Extensions.Get("x-initialSync", &initialSync)
if err == nil && success && initialSync && isSync(trigger) { if err == nil && success && initialSync && isSync(trigger) {
// Need to check initial files are in container that are meant to be synched from watch action // Need to check initial files are in container that are meant to be synched from watch action
err := s.initialSync(ctx, project, service, trigger, ignore, syncer) err := s.initialSync(ctx, project, service, trigger, syncer)
if err != nil { if err != nil {
return err return err
} }
} }
} }
paths = append(paths, trigger.Path) paths = append(paths, trigger.Path)
pathLogs = append(pathLogs, fmt.Sprintf("Action %s for path %q", trigger.Action, trigger.Path))
} }
watcher, err := watch.NewWatcher(paths, ignore) serviceWatchRules, err := getWatchRules(config, service)
if err != nil { if err != nil {
return err return err
} }
rules = append(rules, serviceWatchRules...)
logrus.Debugf("Watch configuration for service %q:%s\n",
service.Name,
strings.Join(append([]string{""}, pathLogs...), "\n - "),
)
err = watcher.Start()
if err != nil {
return err
}
watching = true
eg.Go(func() error {
defer func() {
if err := watcher.Close(); err != nil {
logrus.Debugf("Error closing watcher for service %s: %v", service.Name, err)
}
}()
return s.watchEvents(ctx, project, service.Name, options, watcher, syncer, config.Watch)
})
} }
if !watching {
if len(paths) == 0 {
return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'develop' section") return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'develop' section")
} }
watcher, err := watch.NewWatcher(paths)
if err != nil {
return err
}
err = watcher.Start()
if err != nil {
return err
}
defer func() {
if err := watcher.Close(); err != nil {
logrus.Debugf("Error closing watcher: %v", err)
}
}()
eg.Go(func() error {
return s.watchEvents(ctx, project, options, watcher, syncer, rules)
})
options.LogTo.Log(api.WatchLogger, "Watch enabled") options.LogTo.Log(api.WatchLogger, "Watch enabled")
for { for {
@ -204,103 +222,73 @@ func (s *composeService) watch(ctx context.Context, syncChannel chan bool, proje
} }
} }
func getWatchRules(config *types.DevelopConfig, service types.ServiceConfig) ([]watchRule, error) {
var rules []watchRule
dockerIgnores, err := watch.LoadDockerIgnore(service.Build)
if err != nil {
return nil, err
}
// add a hardcoded set of ignores on top of what came from .dockerignore
// some of this should likely be configurable (e.g. there could be cases
// where you want `.git` to be synced) but this is suitable for now
dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
if err != nil {
return nil, err
}
for _, trigger := range config.Watch {
ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
if err != nil {
return nil, err
}
rules = append(rules, watchRule{
Trigger: trigger,
ignore: watch.NewCompositeMatcher(
dockerIgnores,
watch.EphemeralPathMatcher(),
dotGitIgnore,
ignore,
),
service: service.Name,
})
}
return rules, nil
}
func isSync(trigger types.Trigger) bool { func isSync(trigger types.Trigger) bool {
return trigger.Action == types.WatchActionSync || trigger.Action == types.WatchActionSyncRestart return trigger.Action == types.WatchActionSync || trigger.Action == types.WatchActionSyncRestart
} }
func (s *composeService) watchEvents(ctx context.Context, project *types.Project, name string, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, triggers []types.Trigger) error { func (s *composeService) watchEvents(ctx context.Context, project *types.Project, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, rules []watchRule) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
ignores := make([]watch.PathMatcher, len(triggers)) // debounce and group filesystem events so that we capture IDE saving many files as one "batch" event
for i, trigger := range triggers { batchEvents := watch.BatchDebounceEvents(ctx, s.clock, watcher.Events())
ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
if err != nil {
return err
}
ignores[i] = ignore
}
events := make(chan fileEvent)
batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events)
quit := make(chan bool)
go func() {
for {
select {
case <-ctx.Done():
quit <- true
return
case batch := <-batchEvents:
start := time.Now()
logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch))
if err := s.handleWatchBatch(ctx, project, name, options, batch, syncer); err != nil {
logrus.Warnf("Error handling changed files for service %s: %v", name, err)
}
logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]",
name, time.Since(start), len(batch))
}
}
}()
for { for {
select { select {
case <-quit: case <-ctx.Done():
options.LogTo.Log(api.WatchLogger, "Watch disabled") options.LogTo.Log(api.WatchLogger, "Watch disabled")
return nil return nil
case err := <-watcher.Errors(): case err := <-watcher.Errors():
options.LogTo.Err(api.WatchLogger, "Watch disabled with errors") options.LogTo.Err(api.WatchLogger, "Watch disabled with errors")
return err return err
case event := <-watcher.Events(): case batch := <-batchEvents:
hostPath := event.Path() start := time.Now()
for i, trigger := range triggers { logrus.Debugf("batch start: count[%d]", len(batch))
logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path) err := s.handleWatchBatch(ctx, project, options, batch, rules, syncer)
if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil { if err != nil {
events <- *fileEvent logrus.Warnf("Error handling changed files: %v", err)
}
} }
logrus.Debugf("batch complete: duration[%s] count[%d]", time.Since(start), len(batch))
} }
} }
} }
// maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore
// rules.
//
// Any errors are logged as warnings and nil (no file event) is returned.
func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent {
if !pathutil.IsChild(trigger.Path, hostPath) {
return nil
}
isIgnored, err := ignore.Matches(hostPath)
if err != nil {
logrus.Warnf("error ignore matching %q: %v", hostPath, err)
return nil
}
if isIgnored {
logrus.Debugf("%s is matching ignore pattern", hostPath)
return nil
}
var containerPath string
if trigger.Target != "" {
rel, err := filepath.Rel(trigger.Path, hostPath)
if err != nil {
logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err)
return nil
}
// always use Unix-style paths for inside the container
containerPath = path.Join(trigger.Target, filepath.ToSlash(rel))
}
return &fileEvent{
Trigger: trigger,
PathMapping: sync.PathMapping{
HostPath: hostPath,
ContainerPath: containerPath,
},
}
}
func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*types.DevelopConfig, error) { func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*types.DevelopConfig, error) {
var config types.DevelopConfig var config types.DevelopConfig
y, ok := service.Extensions["x-develop"] y, ok := service.Extensions["x-develop"]
@ -342,52 +330,6 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
return &config, nil return &config, nil
} }
// batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
// channel.
//
// The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent {
out := make(chan []fileEvent)
go func() {
defer close(out)
seen := make(map[string]fileEvent)
flushEvents := func() {
if len(seen) == 0 {
return
}
events := make([]fileEvent, 0, len(seen))
for _, e := range seen {
events = append(events, e)
}
out <- events
seen = make(map[string]fileEvent)
}
t := clock.NewTicker(delay)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.Chan():
flushEvents()
case e, ok := <-input:
if !ok {
// input channel was closed
flushEvents()
return
}
if _, ok := seen[e.HostPath]; !ok {
// already know updated path, first rule in watch configuration wins
seen[e.HostPath] = e
}
t.Reset(delay)
}
}
}()
return out
}
func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool { func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
for _, volume := range volumes { for _, volume := range volumes {
if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) { if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) {
@ -475,39 +417,60 @@ func (t tarDockerClient) Untar(ctx context.Context, id string, archive io.ReadCl
}) })
} }
func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions, batch []fileEvent, syncer sync.Syncer) error { //nolint:gocyclo
pathMappings := make([]sync.PathMapping, len(batch)) func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, options api.WatchOptions, batch []watch.FileEvent, rules []watchRule, syncer sync.Syncer) error {
restartService := false var (
syncService := false restart = map[string]bool{}
for i := range batch { syncfiles = map[string][]*sync.PathMapping{}
switch batch[i].Trigger.Action { exec = map[string][]int{}
case types.WatchActionRebuild: rebuild = map[string]bool{}
return s.rebuild(ctx, project, serviceName, options) )
case types.WatchActionSync, types.WatchActionSyncExec: for _, event := range batch {
syncService = true for i, rule := range rules {
case types.WatchActionSyncRestart: mapping := rule.Matches(event)
restartService = true if mapping == nil {
syncService = true continue
case types.WatchActionRestart: }
restartService = true
switch rule.Action {
case types.WatchActionRebuild:
rebuild[rule.service] = true
case types.WatchActionSync:
syncfiles[rule.service] = append(syncfiles[rule.service], mapping)
case types.WatchActionRestart:
restart[rule.service] = true
case types.WatchActionSyncRestart:
syncfiles[rule.service] = append(syncfiles[rule.service], mapping)
restart[rule.service] = true
case types.WatchActionSyncExec:
syncfiles[rule.service] = append(syncfiles[rule.service], mapping)
// We want to run exec hooks only once after syncfiles if multiple file events match
// as we can't compare ServiceHook to sort and compact a slice, collect rule indexes
exec[rule.service] = append(exec[rule.service], i)
}
} }
pathMappings[i] = batch[i].PathMapping
} }
writeWatchSyncMessage(options.LogTo, serviceName, pathMappings, restartService) logrus.Debugf("watch actions: rebuild %d sync %d restart %d", len(rebuild), len(syncfiles), len(restart))
service, err := project.GetService(serviceName) if len(rebuild) > 0 {
if err != nil { err := s.rebuild(ctx, project, utils.MapKeys(rebuild), options)
return err if err != nil {
}
if syncService {
if err := syncer.Sync(ctx, service, pathMappings); err != nil {
return err return err
} }
} }
if restartService {
err = s.restart(ctx, project.Name, api.RestartOptions{ for serviceName, pathMappings := range syncfiles {
Services: []string{serviceName}, writeWatchSyncMessage(options.LogTo, serviceName, pathMappings)
err := syncer.Sync(ctx, serviceName, pathMappings)
if err != nil {
return err
}
}
if len(restart) > 0 {
services := utils.MapKeys(restart)
err := s.restart(ctx, project.Name, api.RestartOptions{
Services: services,
Project: project, Project: project,
NoDeps: false, NoDeps: false,
}) })
@ -516,12 +479,14 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
} }
options.LogTo.Log( options.LogTo.Log(
api.WatchLogger, api.WatchLogger,
fmt.Sprintf("service %q restarted", serviceName)) fmt.Sprintf("service(s) %q restarted", services))
} }
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
for _, b := range batch { for service, rulesToExec := range exec {
if b.Trigger.Action == types.WatchActionSyncExec { slices.Sort(rulesToExec)
err := s.exec(ctx, project, serviceName, b.Trigger.Exec, eg) for _, i := range slices.Compact(rulesToExec) {
err := s.exec(ctx, project, service, rules[i].Exec, eg)
if err != nil { if err != nil {
return err return err
} }
@ -554,10 +519,10 @@ func (s *composeService) exec(ctx context.Context, project *types.Project, servi
return nil return nil
} }
func (s *composeService) rebuild(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions) error { func (s *composeService) rebuild(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName)) options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service(s) %q after changes were detected...", services))
// restrict the build to ONLY this service, not any of its dependencies // restrict the build to ONLY this service, not any of its dependencies
options.Build.Services = []string{serviceName} options.Build.Services = services
imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil) imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)
if err != nil { if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err)) options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
@ -568,19 +533,18 @@ func (s *composeService) rebuild(ctx context.Context, project *types.Project, se
s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap) s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
} }
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName)) options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service(s) %q successfully built", services))
err = s.create(ctx, project, api.CreateOptions{ err = s.create(ctx, project, api.CreateOptions{
Services: []string{serviceName}, Services: services,
Inherit: true, Inherit: true,
Recreate: api.RecreateForce, Recreate: api.RecreateForce,
}) })
if err != nil { if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err)) options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate services after update. Error: %v", err))
return err return err
} }
services := []string{serviceName}
p, err := project.WithSelectedServices(services) p, err := project.WithSelectedServices(services)
if err != nil { if err != nil {
return err return err
@ -597,11 +561,7 @@ func (s *composeService) rebuild(ctx context.Context, project *types.Project, se
} }
// writeWatchSyncMessage prints out a message about the sync for the changed paths. // writeWatchSyncMessage prints out a message about the sync for the changed paths.
func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings []sync.PathMapping, restart bool) { func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings []*sync.PathMapping) {
action := "Syncing"
if restart {
action = "Syncing and restarting"
}
if logrus.IsLevelEnabled(logrus.DebugLevel) { if logrus.IsLevelEnabled(logrus.DebugLevel) {
hostPathsToSync := make([]string, len(pathMappings)) hostPathsToSync := make([]string, len(pathMappings))
for i := range pathMappings { for i := range pathMappings {
@ -610,8 +570,7 @@ func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings
log.Log( log.Log(
api.WatchLogger, api.WatchLogger,
fmt.Sprintf( fmt.Sprintf(
"%s service %q after changes were detected: %s", "Syncing service %q after changes were detected: %s",
action,
serviceName, serviceName,
strings.Join(hostPathsToSync, ", "), strings.Join(hostPathsToSync, ", "),
), ),
@ -619,7 +578,7 @@ func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings
} else { } else {
log.Log( log.Log(
api.WatchLogger, api.WatchLogger,
fmt.Sprintf("%s service %q after %d changes were detected", action, serviceName, len(pathMappings)), fmt.Sprintf("Syncing service %q after %d changes were detected", serviceName, len(pathMappings)),
) )
} }
} }
@ -648,29 +607,40 @@ func (s *composeService) pruneDanglingImagesOnRebuild(ctx context.Context, proje
// Walks develop.watch.path and checks which files should be copied inside the container // Walks develop.watch.path and checks which files should be copied inside the container
// ignores develop.watch.ignore, Dockerfile, compose files, bind mounted paths and .git // ignores develop.watch.ignore, Dockerfile, compose files, bind mounted paths and .git
func (s *composeService) initialSync(ctx context.Context, project *types.Project, service types.ServiceConfig, trigger types.Trigger, ignore watch.PathMatcher, syncer sync.Syncer) error { func (s *composeService) initialSync(ctx context.Context, project *types.Project, service types.ServiceConfig, trigger types.Trigger, syncer sync.Syncer) error {
dockerFileIgnore, err := watch.NewDockerPatternMatcher("/", []string{"Dockerfile", "*compose*.y*ml"}) dockerIgnores, err := watch.LoadDockerIgnore(service.Build)
if err != nil { if err != nil {
return err return err
} }
triggerIgnore, err := watch.NewDockerPatternMatcher("/", trigger.Ignore)
dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
if err != nil { if err != nil {
return err return err
} }
ignoreInitialSync := watch.NewCompositeMatcher(ignore, dockerFileIgnore, triggerIgnore)
triggerIgnore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
if err != nil {
return err
}
// FIXME .dockerignore
ignoreInitialSync := watch.NewCompositeMatcher(
dockerIgnores,
watch.EphemeralPathMatcher(),
dotGitIgnore,
triggerIgnore)
pathsToCopy, err := s.initialSyncFiles(ctx, project, service, trigger, ignoreInitialSync) pathsToCopy, err := s.initialSyncFiles(ctx, project, service, trigger, ignoreInitialSync)
if err != nil { if err != nil {
return err return err
} }
return syncer.Sync(ctx, service, pathsToCopy) return syncer.Sync(ctx, service.Name, pathsToCopy)
} }
// Syncs files from develop.watch.path if thy have been modified after the image has been created // Syncs files from develop.watch.path if thy have been modified after the image has been created
// //
//nolint:gocyclo //nolint:gocyclo
func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Project, service types.ServiceConfig, trigger types.Trigger, ignore watch.PathMatcher) ([]sync.PathMapping, error) { func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Project, service types.ServiceConfig, trigger types.Trigger, ignore watch.PathMatcher) ([]*sync.PathMapping, error) {
fi, err := os.Stat(trigger.Path) fi, err := os.Stat(trigger.Path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -679,7 +649,7 @@ func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Pr
if err != nil { if err != nil {
return nil, err return nil, err
} }
var pathsToCopy []sync.PathMapping var pathsToCopy []*sync.PathMapping
switch mode := fi.Mode(); { switch mode := fi.Mode(); {
case mode.IsDir(): case mode.IsDir():
// process directory // process directory
@ -714,7 +684,7 @@ func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Pr
return err return err
} }
// only copy files (and not full directories) // only copy files (and not full directories)
pathsToCopy = append(pathsToCopy, sync.PathMapping{ pathsToCopy = append(pathsToCopy, &sync.PathMapping{
HostPath: path, HostPath: path,
ContainerPath: filepath.Join(trigger.Target, rel), ContainerPath: filepath.Join(trigger.Target, rel),
}) })
@ -724,7 +694,7 @@ func (s *composeService) initialSyncFiles(ctx context.Context, project *types.Pr
case mode.IsRegular(): case mode.IsRegular():
// process file // process file
if fi.ModTime().After(timeImageCreated) && !shouldIgnore(filepath.Base(trigger.Path), ignore) && !checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) { if fi.ModTime().After(timeImageCreated) && !shouldIgnore(filepath.Base(trigger.Path), ignore) && !checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
pathsToCopy = append(pathsToCopy, sync.PathMapping{ pathsToCopy = append(pathsToCopy, &sync.PathMapping{
HostPath: trigger.Path, HostPath: trigger.Path,
ContainerPath: trigger.Target, ContainerPath: trigger.Target,
}) })

View File

@ -18,8 +18,6 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"slices"
"strings"
"testing" "testing"
"time" "time"
@ -38,53 +36,6 @@ import (
"gotest.tools/v3/assert" "gotest.tools/v3/assert"
) )
func TestDebounceBatching(t *testing.T) {
ch := make(chan fileEvent)
clock := clockwork.NewFakeClock()
ctx, stop := context.WithCancel(context.Background())
t.Cleanup(stop)
trigger := types.Trigger{
Path: "/",
}
matcher := watch.EmptyMatcher{}
eventBatchCh := batchDebounceEvents(ctx, clock, quietPeriod, ch)
for i := 0; i < 100; i++ {
path := "/a"
if i%2 == 0 {
path = "/b"
}
event := maybeFileEvent(trigger, path, matcher)
require.NotNil(t, event)
ch <- *event
}
// we sent 100 events + the debouncer
clock.BlockUntil(101)
clock.Advance(quietPeriod)
select {
case batch := <-eventBatchCh:
slices.SortFunc(batch, func(a, b fileEvent) int {
return strings.Compare(a.HostPath, b.HostPath)
})
assert.Equal(t, len(batch), 2)
assert.Equal(t, batch[0].HostPath, "/a")
assert.Equal(t, batch[1].HostPath, "/b")
case <-time.After(50 * time.Millisecond):
t.Fatal("timed out waiting for events")
}
clock.BlockUntil(1)
clock.Advance(quietPeriod)
// there should only be a single batch
select {
case batch := <-eventBatchCh:
t.Fatalf("unexpected events: %v", batch)
case <-time.After(50 * time.Millisecond):
// channel is empty
}
}
type testWatcher struct { type testWatcher struct {
events chan watch.FileEvent events chan watch.FileEvent
errors chan error errors chan error
@ -170,32 +121,37 @@ func TestWatch_Sync(t *testing.T) {
dockerCli: cli, dockerCli: cli,
clock: clock, clock: clock,
} }
err := service.watchEvents(ctx, &proj, "test", api.WatchOptions{ rules, err := getWatchRules(&types.DevelopConfig{
Watch: []types.Trigger{
{
Path: "/sync",
Action: "sync",
Target: "/work",
Ignore: []string{"ignore"},
},
{
Path: "/rebuild",
Action: "rebuild",
},
},
}, types.ServiceConfig{Name: "test"})
assert.NilError(t, err)
err = service.watchEvents(ctx, &proj, api.WatchOptions{
Build: &api.BuildOptions{}, Build: &api.BuildOptions{},
LogTo: stdLogger{}, LogTo: stdLogger{},
Prune: true, Prune: true,
}, watcher, syncer, []types.Trigger{ }, watcher, syncer, rules)
{
Path: "/sync",
Action: "sync",
Target: "/work",
Ignore: []string{"ignore"},
},
{
Path: "/rebuild",
Action: "rebuild",
},
})
assert.NilError(t, err) assert.NilError(t, err)
}() }()
watcher.Events() <- watch.NewFileEvent("/sync/changed") watcher.Events() <- watch.NewFileEvent("/sync/changed")
watcher.Events() <- watch.NewFileEvent("/sync/changed/sub") watcher.Events() <- watch.NewFileEvent("/sync/changed/sub")
clock.BlockUntil(3) clock.BlockUntil(3)
clock.Advance(quietPeriod) clock.Advance(watch.QuietPeriod)
select { select {
case actual := <-syncer.synced: case actual := <-syncer.synced:
require.ElementsMatch(t, []sync.PathMapping{ require.ElementsMatch(t, []*sync.PathMapping{
{HostPath: "/sync/changed", ContainerPath: "/work/changed"}, {HostPath: "/sync/changed", ContainerPath: "/work/changed"},
{HostPath: "/sync/changed/sub", ContainerPath: "/work/changed/sub"}, {HostPath: "/sync/changed/sub", ContainerPath: "/work/changed/sub"},
}, actual) }, actual)
@ -203,24 +159,10 @@ func TestWatch_Sync(t *testing.T) {
t.Error("timeout") t.Error("timeout")
} }
watcher.Events() <- watch.NewFileEvent("/sync/ignore")
watcher.Events() <- watch.NewFileEvent("/sync/ignore/sub")
watcher.Events() <- watch.NewFileEvent("/sync/changed")
clock.BlockUntil(4)
clock.Advance(quietPeriod)
select {
case actual := <-syncer.synced:
require.ElementsMatch(t, []sync.PathMapping{
{HostPath: "/sync/changed", ContainerPath: "/work/changed"},
}, actual)
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for events")
}
watcher.Events() <- watch.NewFileEvent("/rebuild") watcher.Events() <- watch.NewFileEvent("/rebuild")
watcher.Events() <- watch.NewFileEvent("/sync/changed") watcher.Events() <- watch.NewFileEvent("/sync/changed")
clock.BlockUntil(4) clock.BlockUntil(4)
clock.Advance(quietPeriod) clock.Advance(watch.QuietPeriod)
select { select {
case batch := <-syncer.synced: case batch := <-syncer.synced:
t.Fatalf("received unexpected events: %v", batch) t.Fatalf("received unexpected events: %v", batch)
@ -231,16 +173,16 @@ func TestWatch_Sync(t *testing.T) {
} }
type fakeSyncer struct { type fakeSyncer struct {
synced chan []sync.PathMapping synced chan []*sync.PathMapping
} }
func newFakeSyncer() *fakeSyncer { func newFakeSyncer() *fakeSyncer {
return &fakeSyncer{ return &fakeSyncer{
synced: make(chan []sync.PathMapping), synced: make(chan []*sync.PathMapping),
} }
} }
func (f *fakeSyncer) Sync(_ context.Context, _ types.ServiceConfig, paths []sync.PathMapping) error { func (f *fakeSyncer) Sync(ctx context.Context, service string, paths []*sync.PathMapping) error {
f.synced <- paths f.synced <- paths
return nil return nil
} }

View File

@ -93,10 +93,7 @@ func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
t.Log("wait for watch to start watching") t.Log("wait for watch to start watching")
c.WaitForCondition(t, func() (bool, string) { c.WaitForCondition(t, func() (bool, string) {
out := r.String() out := r.String()
errors := r.String() return strings.Contains(out, "Watch enabled"), "watch not started"
return strings.Contains(out,
"Watch configuration"), fmt.Sprintf("'Watch configuration' not found in : \n%s\nStderr: \n%s\n", out,
errors)
}, 30*time.Second, 1*time.Second) }, 30*time.Second, 1*time.Second)
pn := c.RunDockerCmd(t, "inspect", containerName, "-f", "{{ .HostConfig.NetworkMode }}") pn := c.RunDockerCmd(t, "inspect", containerName, "-f", "{{ .HostConfig.NetworkMode }}")
@ -112,7 +109,7 @@ func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
t.Log("check if the container has been rebuild") t.Log("check if the container has been rebuild")
c.WaitForCondition(t, func() (bool, string) { c.WaitForCondition(t, func() (bool, string) {
out := r.String() out := r.String()
if strings.Count(out, "batch complete: service["+svcName+"]") != 1 { if strings.Count(out, "batch complete") != 1 {
return false, fmt.Sprintf("container %s was not rebuilt", containerName) return false, fmt.Sprintf("container %s was not rebuilt", containerName)
} }
return true, fmt.Sprintf("container %s was rebuilt", containerName) return true, fmt.Sprintf("container %s was rebuilt", containerName)
@ -283,7 +280,7 @@ func doTest(t *testing.T, svcName string) {
return poll.Continue("%v", r.Combined()) return poll.Continue("%v", r.Combined())
} }
} }
poll.WaitOn(t, checkRestart(fmt.Sprintf("service %q restarted", svcName))) poll.WaitOn(t, checkRestart(fmt.Sprintf("service(s) [%q] restarted", svcName)))
poll.WaitOn(t, checkFileContents("/app/config/file.config", "This is an updated config file")) poll.WaitOn(t, checkFileContents("/app/config/file.config", "This is an updated config file"))
testComplete.Store(true) testComplete.Store(true)

73
pkg/watch/debounce.go Normal file
View File

@ -0,0 +1,73 @@
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"context"
"time"
"github.com/docker/compose/v2/pkg/utils"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
)
const QuietPeriod = 500 * time.Millisecond
// batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
// channel.
//
// The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
func BatchDebounceEvents(ctx context.Context, clock clockwork.Clock, input <-chan FileEvent) <-chan []FileEvent {
out := make(chan []FileEvent)
go func() {
defer close(out)
seen := utils.Set[FileEvent]{}
flushEvents := func() {
if len(seen) == 0 {
return
}
logrus.Debugf("flush: %d events %s", len(seen), seen)
events := make([]FileEvent, 0, len(seen))
for e := range seen {
events = append(events, e)
}
out <- events
seen = utils.Set[FileEvent]{}
}
t := clock.NewTicker(QuietPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.Chan():
flushEvents()
case e, ok := <-input:
if !ok {
// input channel was closed
flushEvents()
return
}
if _, ok := seen[e]; !ok {
seen.Add(e)
}
t.Reset(QuietPeriod)
}
}
}()
return out
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"context"
"slices"
"testing"
"time"
"github.com/jonboulle/clockwork"
"gotest.tools/v3/assert"
)
func Test_BatchDebounceEvents(t *testing.T) {
ch := make(chan FileEvent)
clock := clockwork.NewFakeClock()
ctx, stop := context.WithCancel(context.Background())
t.Cleanup(stop)
eventBatchCh := BatchDebounceEvents(ctx, clock, ch)
for i := 0; i < 100; i++ {
path := "/a"
if i%2 == 0 {
path = "/b"
}
ch <- FileEvent(path)
}
// we sent 100 events + the debouncer
clock.BlockUntil(101)
clock.Advance(QuietPeriod)
select {
case batch := <-eventBatchCh:
slices.Sort(batch)
assert.Equal(t, len(batch), 2)
assert.Equal(t, batch[0], FileEvent("/a"))
assert.Equal(t, batch[1], FileEvent("/b"))
case <-time.After(50 * time.Millisecond):
t.Fatal("timed out waiting for events")
}
clock.BlockUntil(1)
clock.Advance(QuietPeriod)
// there should only be a single batch
select {
case batch := <-eventBatchCh:
t.Fatalf("unexpected events: %v", batch)
case <-time.After(50 * time.Millisecond):
// channel is empty
}
}

View File

@ -30,19 +30,13 @@ import (
var numberOfWatches = expvar.NewInt("watch.naive.numberOfWatches") var numberOfWatches = expvar.NewInt("watch.naive.numberOfWatches")
type FileEvent struct { type FileEvent string
path string
}
func NewFileEvent(p string) FileEvent { func NewFileEvent(p string) FileEvent {
if !filepath.IsAbs(p) { if !filepath.IsAbs(p) {
panic(fmt.Sprintf("NewFileEvent only accepts absolute paths. Actual: %s", p)) panic(fmt.Sprintf("NewFileEvent only accepts absolute paths. Actual: %s", p))
} }
return FileEvent{path: p} return FileEvent(p)
}
func (e FileEvent) Path() string {
return e.path
} }
type Notify interface { type Notify interface {
@ -81,8 +75,8 @@ func (EmptyMatcher) MatchesEntireDir(f string) (bool, error) { return false, nil
var _ PathMatcher = EmptyMatcher{} var _ PathMatcher = EmptyMatcher{}
func NewWatcher(paths []string, ignore PathMatcher) (Notify, error) { func NewWatcher(paths []string) (Notify, error) {
return newWatcher(paths, ignore) return newWatcher(paths)
} }
const WindowsBufferSizeEnvVar = "COMPOSE_WATCH_WINDOWS_BUFFER_SIZE" const WindowsBufferSizeEnvVar = "COMPOSE_WATCH_WINDOWS_BUFFER_SIZE"

View File

@ -485,96 +485,6 @@ func TestWatchCountInnerFile(t *testing.T) {
assert.Equal(t, expectedWatches, int(numberOfWatches.Value())) assert.Equal(t, expectedWatches, int(numberOfWatches.Value()))
} }
func TestWatchCountInnerFileWithIgnore(t *testing.T) {
f := newNotifyFixture(t)
root := f.paths[0]
ignore, _ := NewDockerPatternMatcher(root, []string{
"a",
"!a/b",
})
f.setIgnore(ignore)
a := f.JoinPath(root, "a")
b := f.JoinPath(a, "b")
file := f.JoinPath(b, "bigFile")
f.WriteFile(file, "hello")
f.assertEvents(b, file)
expectedWatches := 3
if isRecursiveWatcher() {
expectedWatches = 1
}
assert.Equal(t, expectedWatches, int(numberOfWatches.Value()))
}
func TestIgnoreCreatedDir(t *testing.T) {
f := newNotifyFixture(t)
root := f.paths[0]
ignore, _ := NewDockerPatternMatcher(root, []string{"a/b"})
f.setIgnore(ignore)
a := f.JoinPath(root, "a")
b := f.JoinPath(a, "b")
file := f.JoinPath(b, "bigFile")
f.WriteFile(file, "hello")
f.assertEvents(a)
expectedWatches := 2
if isRecursiveWatcher() {
expectedWatches = 1
}
assert.Equal(t, expectedWatches, int(numberOfWatches.Value()))
}
func TestIgnoreCreatedDirWithExclusions(t *testing.T) {
f := newNotifyFixture(t)
root := f.paths[0]
ignore, _ := NewDockerPatternMatcher(root,
[]string{
"a/b",
"c",
"!c/d",
})
f.setIgnore(ignore)
a := f.JoinPath(root, "a")
b := f.JoinPath(a, "b")
file := f.JoinPath(b, "bigFile")
f.WriteFile(file, "hello")
f.assertEvents(a)
expectedWatches := 2
if isRecursiveWatcher() {
expectedWatches = 1
}
assert.Equal(t, expectedWatches, int(numberOfWatches.Value()))
}
func TestIgnoreInitialDir(t *testing.T) {
f := newNotifyFixture(t)
root := f.TempDir("root")
ignore, _ := NewDockerPatternMatcher(root, []string{"a/b"})
f.setIgnore(ignore)
a := f.JoinPath(root, "a")
b := f.JoinPath(a, "b")
file := f.JoinPath(b, "bigFile")
f.WriteFile(file, "hello")
f.watch(root)
f.assertEvents()
expectedWatches := 3
if isRecursiveWatcher() {
expectedWatches = 2
}
assert.Equal(t, expectedWatches, int(numberOfWatches.Value()))
}
func isRecursiveWatcher() bool { func isRecursiveWatcher() bool {
return runtime.GOOS == "darwin" || runtime.GOOS == "windows" return runtime.GOOS == "darwin" || runtime.GOOS == "windows"
} }
@ -585,7 +495,6 @@ type notifyFixture struct {
out *bytes.Buffer out *bytes.Buffer
*TempDirFixture *TempDirFixture
notify Notify notify Notify
ignore PathMatcher
paths []string paths []string
events []FileEvent events []FileEvent
} }
@ -598,7 +507,6 @@ func newNotifyFixture(t *testing.T) *notifyFixture {
cancel: cancel, cancel: cancel,
TempDirFixture: NewTempDirFixture(t), TempDirFixture: NewTempDirFixture(t),
paths: []string{}, paths: []string{},
ignore: EmptyMatcher{},
out: out, out: out,
} }
nf.watch(nf.TempDir("watched")) nf.watch(nf.TempDir("watched"))
@ -606,11 +514,6 @@ func newNotifyFixture(t *testing.T) *notifyFixture {
return nf return nf
} }
func (f *notifyFixture) setIgnore(ignore PathMatcher) {
f.ignore = ignore
f.rebuildWatcher()
}
func (f *notifyFixture) watch(path string) { func (f *notifyFixture) watch(path string) {
f.paths = append(f.paths, path) f.paths = append(f.paths, path)
f.rebuildWatcher() f.rebuildWatcher()
@ -624,7 +527,7 @@ func (f *notifyFixture) rebuildWatcher() {
} }
// create a new watcher // create a new watcher
notify, err := NewWatcher(f.paths, f.ignore) notify, err := NewWatcher(f.paths)
if err != nil { if err != nil {
f.T().Fatal(err) f.T().Fatal(err)
} }
@ -648,7 +551,7 @@ func (f *notifyFixture) assertEvents(expected ...string) {
} }
for i, actual := range f.events { for i, actual := range f.events {
e := FileEvent{expected[i]} e := FileEvent(expected[i])
if actual != e { if actual != e {
f.T().Fatalf("Got event %v (expected %v)", actual, e) f.T().Fatalf("Got event %v (expected %v)", actual, e)
} }
@ -702,16 +605,16 @@ F:
f.T().Fatal(err) f.T().Fatal(err)
case event := <-f.notify.Events(): case event := <-f.notify.Events():
if strings.Contains(event.Path(), syncPath) { if strings.Contains(string(event), syncPath) {
break F break F
} }
if strings.Contains(event.Path(), anySyncPath) { if strings.Contains(string(event), anySyncPath) {
continue continue
} }
// Don't bother tracking duplicate changes to the same path // Don't bother tracking duplicate changes to the same path
// for testing. // for testing.
if len(f.events) > 0 && f.events[len(f.events)-1].Path() == event.Path() { if len(f.events) > 0 && f.events[len(f.events)-1] == event {
continue continue
} }

View File

@ -27,7 +27,6 @@ import (
pathutil "github.com/docker/compose/v2/internal/paths" pathutil "github.com/docker/compose/v2/internal/paths"
"github.com/fsnotify/fsevents" "github.com/fsnotify/fsevents"
"github.com/sirupsen/logrus"
) )
// A file watcher optimized for Darwin. // A file watcher optimized for Darwin.
@ -39,7 +38,6 @@ type fseventNotify struct {
stop chan struct{} stop chan struct{}
pathsWereWatching map[string]interface{} pathsWereWatching map[string]interface{}
ignore PathMatcher
} }
func (d *fseventNotify) loop() { func (d *fseventNotify) loop() {
@ -62,14 +60,6 @@ func (d *fseventNotify) loop() {
continue continue
} }
ignore, err := d.ignore.Matches(e.Path)
if err != nil {
logrus.Infof("Error matching path %q: %v", e.Path, err)
} else if ignore {
logrus.Tracef("Ignoring event for path: %v", e.Path)
continue
}
d.events <- NewFileEvent(e.Path) d.events <- NewFileEvent(e.Path)
} }
} }
@ -118,9 +108,8 @@ func (d *fseventNotify) Errors() chan error {
return d.errors return d.errors
} }
func newWatcher(paths []string, ignore PathMatcher) (Notify, error) { func newWatcher(paths []string) (Notify, error) {
dw := &fseventNotify{ dw := &fseventNotify{
ignore: ignore,
stream: &fsevents.EventStream{ stream: &fsevents.EventStream{
Latency: 50 * time.Millisecond, Latency: 50 * time.Millisecond,
Flags: fsevents.FileEvents | fsevents.IgnoreSelf, Flags: fsevents.FileEvents | fsevents.IgnoreSelf,

View File

@ -46,8 +46,6 @@ type naiveNotify struct {
// structure, so we can filter the list quickly. // structure, so we can filter the list quickly.
notifyList map[string]bool notifyList map[string]bool
ignore PathMatcher
isWatcherRecursive bool isWatcherRecursive bool
watcher *fsnotify.Watcher watcher *fsnotify.Watcher
events chan fsnotify.Event events chan fsnotify.Event
@ -122,12 +120,7 @@ func (d *naiveNotify) watchRecursively(dir string) error {
return nil return nil
} }
shouldSkipDir, err := d.shouldSkipDir(path) if d.shouldSkipDir(path) {
if err != nil {
return err
}
if shouldSkipDir {
logrus.Debugf("Ignoring directory and its contents (recursively): %s", path) logrus.Debugf("Ignoring directory and its contents (recursively): %s", path)
return filepath.SkipDir return filepath.SkipDir
} }
@ -168,14 +161,14 @@ func (d *naiveNotify) loop() { //nolint:gocyclo
if e.Op&fsnotify.Create != fsnotify.Create { if e.Op&fsnotify.Create != fsnotify.Create {
if d.shouldNotify(e.Name) { if d.shouldNotify(e.Name) {
d.wrappedEvents <- FileEvent{e.Name} d.wrappedEvents <- FileEvent(e.Name)
} }
continue continue
} }
if d.isWatcherRecursive { if d.isWatcherRecursive {
if d.shouldNotify(e.Name) { if d.shouldNotify(e.Name) {
d.wrappedEvents <- FileEvent{e.Name} d.wrappedEvents <- FileEvent(e.Name)
} }
continue continue
} }
@ -191,7 +184,7 @@ func (d *naiveNotify) loop() { //nolint:gocyclo
} }
if d.shouldNotify(path) { if d.shouldNotify(path) {
d.wrappedEvents <- FileEvent{path} d.wrappedEvents <- FileEvent(path)
} }
// TODO(dmiller): symlinks 😭 // TODO(dmiller): symlinks 😭
@ -199,11 +192,7 @@ func (d *naiveNotify) loop() { //nolint:gocyclo
shouldWatch := false shouldWatch := false
if info.IsDir() { if info.IsDir() {
// watch directories unless we can skip them entirely // watch directories unless we can skip them entirely
shouldSkipDir, err := d.shouldSkipDir(path) if d.shouldSkipDir(path) {
if err != nil {
return err
}
if shouldSkipDir {
return filepath.SkipDir return filepath.SkipDir
} }
@ -230,14 +219,6 @@ func (d *naiveNotify) loop() { //nolint:gocyclo
} }
func (d *naiveNotify) shouldNotify(path string) bool { func (d *naiveNotify) shouldNotify(path string) bool {
ignore, err := d.ignore.Matches(path)
if err != nil {
logrus.Infof("Error matching path %q: %v", path, err)
} else if ignore {
logrus.Tracef("Ignoring event for path: %v", path)
return false
}
if _, ok := d.notifyList[path]; ok { if _, ok := d.notifyList[path]; ok {
// We generally don't care when directories change at the root of an ADD // We generally don't care when directories change at the root of an ADD
stat, err := os.Lstat(path) stat, err := os.Lstat(path)
@ -253,19 +234,10 @@ func (d *naiveNotify) shouldNotify(path string) bool {
return false return false
} }
func (d *naiveNotify) shouldSkipDir(path string) (bool, error) { func (d *naiveNotify) shouldSkipDir(path string) bool {
// If path is directly in the notifyList, we should always watch it. // If path is directly in the notifyList, we should always watch it.
if d.notifyList[path] { if d.notifyList[path] {
return false, nil return false
}
skip, err := d.ignore.MatchesEntireDir(path)
if err != nil {
return false, fmt.Errorf("shouldSkipDir: %w", err)
}
if skip {
return true, nil
} }
// Suppose we're watching // Suppose we're watching
@ -282,10 +254,10 @@ func (d *naiveNotify) shouldSkipDir(path string) (bool, error) {
// (i.e., to cover the "path doesn't exist" case). // (i.e., to cover the "path doesn't exist" case).
for root := range d.notifyList { for root := range d.notifyList {
if pathutil.IsChild(root, path) || pathutil.IsChild(path, root) { if pathutil.IsChild(root, path) || pathutil.IsChild(path, root) {
return false, nil return false
} }
} }
return true, nil return true
} }
func (d *naiveNotify) add(path string) error { func (d *naiveNotify) add(path string) error {
@ -298,11 +270,7 @@ func (d *naiveNotify) add(path string) error {
return nil return nil
} }
func newWatcher(paths []string, ignore PathMatcher) (Notify, error) { func newWatcher(paths []string) (Notify, error) {
if ignore == nil {
return nil, fmt.Errorf("newWatcher: ignore is nil")
}
fsw, err := fsnotify.NewWatcher() fsw, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
if strings.Contains(err.Error(), "too many open files") && runtime.GOOS == "linux" { if strings.Contains(err.Error(), "too many open files") && runtime.GOOS == "linux" {
@ -332,7 +300,6 @@ func newWatcher(paths []string, ignore PathMatcher) (Notify, error) {
wmw := &naiveNotify{ wmw := &naiveNotify{
notifyList: notifyList, notifyList: notifyList,
ignore: ignore,
watcher: fsw, watcher: fsw,
events: fsw.Events, events: fsw.Events,
wrappedEvents: wrappedEvents, wrappedEvents: wrappedEvents,