diff --git a/pkg/watch/features.go b/pkg/watch/features.go deleted file mode 100644 index efc8b890b..000000000 --- a/pkg/watch/features.go +++ /dev/null @@ -1,27 +0,0 @@ -package watch - -import ( - "os" - "strconv" -) - -const CheckLimitKey = "WM_CHECK_LIMIT" - -var limitChecksEnabled = true - -// Allows limit checks to be disabled for testing. -func SetLimitChecksEnabled(enabled bool) { - limitChecksEnabled = enabled -} - -func LimitChecksEnabled() bool { - env, ok := os.LookupEnv(CheckLimitKey) - if ok { - enabled, err := strconv.ParseBool(env) - if err == nil { - return enabled - } - } - - return limitChecksEnabled -} diff --git a/pkg/watch/notify_test.go b/pkg/watch/notify_test.go index ed75e444f..a70a75be2 100644 --- a/pkg/watch/notify_test.go +++ b/pkg/watch/notify_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" "strings" "testing" "time" @@ -329,7 +330,7 @@ func TestWatchBothDirAndFile(t *testing.T) { f.assertEvents(fileB) } -func TestWatchNonexistentDirectory(t *testing.T) { +func TestWatchNonexistentFileInNonexistentDirectoryCreatedSimultaneously(t *testing.T) { f := newNotifyFixture(t) defer f.tearDown() @@ -347,6 +348,69 @@ func TestWatchNonexistentDirectory(t *testing.T) { f.assertEvents(file) } +func TestWatchNonexistentDirectory(t *testing.T) { + f := newNotifyFixture(t) + defer f.tearDown() + + root := f.JoinPath("root") + err := os.Mkdir(root, 0777) + if err != nil { + t.Fatal(err) + } + parent := f.JoinPath("parent") + file := f.JoinPath("parent", "a") + + f.watch(parent) + f.fsync() + f.events = nil + + err = os.Mkdir(parent, 0777) + if err != nil { + t.Fatal(err) + } + + if runtime.GOOS == "darwin" { + // for directories that were the root of an Add, we don't report creation, cf. watcher_darwin.go + f.assertEvents() + } else { + f.assertEvents(parent) + } + f.WriteFile(file, "hello") + + if runtime.GOOS == "darwin" { + // mac doesn't return the dir change as part of file creation + f.assertEvents(file) + } else { + f.assertEvents(parent, file) + } +} + +// doesn't work on linux +// func TestWatchNonexistentFileInNonexistentDirectory(t *testing.T) { +// f := newNotifyFixture(t) +// defer f.tearDown() + +// root := f.JoinPath("root") +// err := os.Mkdir(root, 0777) +// if err != nil { +// t.Fatal(err) +// } +// parent := f.JoinPath("parent") +// file := f.JoinPath("parent", "a") + +// f.watch(file) +// f.assertEvents() + +// err = os.Mkdir(parent, 0777) +// if err != nil { +// t.Fatal(err) +// } + +// f.assertEvents() +// f.WriteFile(file, "hello") +// f.assertEvents(file) +// } + type notifyFixture struct { *tempdir.TempDirFixture notify Notify @@ -355,7 +419,6 @@ type notifyFixture struct { } func newNotifyFixture(t *testing.T) *notifyFixture { - SetLimitChecksEnabled(false) notify, err := NewWatcher() if err != nil { t.Fatal(err) @@ -434,12 +497,20 @@ F: } func (f *notifyFixture) tearDown() { - SetLimitChecksEnabled(true) - err := f.notify.Close() if err != nil { f.T().Fatal(err) } + // drain channels from watcher + go func() { + for _ = range f.notify.Events() { + } + }() + go func() { + for _ = range f.notify.Errors() { + } + }() + f.TempDirFixture.TearDown() } diff --git a/pkg/watch/watcher_naive.go b/pkg/watch/watcher_naive.go index bb240c9ba..4a7656c79 100644 --- a/pkg/watch/watcher_naive.go +++ b/pkg/watch/watcher_naive.go @@ -7,6 +7,7 @@ import ( "log" "os" "path/filepath" + "sync" "github.com/pkg/errors" "github.com/windmilleng/fsnotify" @@ -24,6 +25,8 @@ type naiveNotify struct { wrappedEvents chan FileEvent errors chan error + mu sync.Mutex + // Paths that we're watching that should be passed up to the caller. // Note that we may have to watch ancestors of these paths // in order to fulfill the API promise. @@ -48,11 +51,14 @@ func (d *naiveNotify) Add(name string) error { return errors.Wrapf(err, "notify.Add(%q)", name) } } else { - err = d.watcher.Add(name) + err = d.watcher.Add(filepath.Dir(name)) if err != nil { - return errors.Wrapf(err, "notify.Add(%q)", name) + return errors.Wrapf(err, "notify.Add(%q)", filepath.Dir(name)) } } + + d.mu.Lock() + defer d.mu.Unlock() d.notifyList[name] = true return nil @@ -64,6 +70,9 @@ func (d *naiveNotify) watchRecursively(dir string) error { return err } + if !mode.IsDir() { + return nil + } err = d.watcher.Add(path) if err != nil { if os.IsNotExist(err) { @@ -106,56 +115,63 @@ func (d *naiveNotify) Errors() chan error { } func (d *naiveNotify) loop() { + defer close(d.wrappedEvents) for e := range d.events { - isCreateOp := e.Op&fsnotify.Create == fsnotify.Create - shouldWalk := false - if isCreateOp { - isDir, err := isDir(e.Name) - if err != nil { - log.Printf("Error stat-ing file %s: %s", e.Name, err) - continue + shouldNotify := d.shouldNotify(e.Name) + + if e.Op&fsnotify.Create != fsnotify.Create { + if shouldNotify { + d.wrappedEvents <- FileEvent{e.Name} } - shouldWalk = isDir + continue } - if shouldWalk { - err := filepath.Walk(e.Name, func(path string, mode os.FileInfo, err error) error { - if err != nil { - return err - } - newE := fsnotify.Event{ - Op: fsnotify.Create, - Name: path, - } - if d.shouldNotify(newE) { - d.wrappedEvents <- FileEvent{newE.Name} - - // TODO(dmiller): symlinks 😭 - err = d.Add(path) - if err != nil { - log.Printf("Error watching path %s: %s", e.Name, err) - } - } - return nil - }) + // TODO(dbentley): if there's a delete should we call d.watcher.Remove to prevent leaking? + if err := filepath.Walk(e.Name, func(path string, mode os.FileInfo, err error) error { if err != nil { - log.Printf("Error walking directory %s: %s", e.Name, err) + return err } - } else if d.shouldNotify(e) { - d.wrappedEvents <- FileEvent{e.Name} + + if d.shouldNotify(path) { + d.wrappedEvents <- FileEvent{path} + } + + // TODO(dmiller): symlinks 😭 + + shouldWatch := false + if mode.IsDir() { + // watch all directories + shouldWatch = true + } else { + // watch files that are explicitly named, but don't watch others + _, ok := d.notifyList[path] + if ok { + shouldWatch = true + } + } + if shouldWatch { + err := d.watcher.Add(path) + if err != nil { + log.Printf("Error watching path %s: %s", e.Name, err) + } + } + return nil + }); err != nil { + log.Printf("Error walking directory %s: %s", e.Name, err) } } } -func (d *naiveNotify) shouldNotify(e fsnotify.Event) bool { - if _, ok := d.notifyList[e.Name]; ok { +func (d *naiveNotify) shouldNotify(path string) bool { + d.mu.Lock() + defer d.mu.Unlock() + if _, ok := d.notifyList[path]; ok { return true - } else { - // TODO(dmiller): maybe use a prefix tree here? - for path := range d.notifyList { - if ospath.IsChild(path, e.Name) { - return true - } + } + // TODO(dmiller): maybe use a prefix tree here? + for root := range d.notifyList { + if ospath.IsChild(root, path) { + return true } } return false diff --git a/pkg/watch/watcher_naive_test.go b/pkg/watch/watcher_naive_test.go new file mode 100644 index 000000000..3b3419ba2 --- /dev/null +++ b/pkg/watch/watcher_naive_test.go @@ -0,0 +1,110 @@ +// +build !darwin + +package watch + +import ( + "fmt" + "os" + "os/exec" + "strconv" + "strings" + "testing" +) + +func TestDontWatchEachFile(t *testing.T) { + // fsnotify is not recursive, so we need to watch each directory + // you can watch individual files with fsnotify, but that is more prone to exhaust resources + // this test uses a Linux way to get the number of watches to make sure we're watching + // per-directory, not per-file + f := newNotifyFixture(t) + defer f.tearDown() + + watched := f.TempDir("watched") + + // there are a few different cases we want to test for because the code paths are slightly + // different: + // 1) initial: data there before we ever call watch + // 2) inplace: data we create while the watch is happening + // 3) staged: data we create in another directory and then atomically move into place + + // initial + f.WriteFile(f.JoinPath(watched, "initial.txt"), "initial data") + + initialDir := f.JoinPath(watched, "initial_dir") + if err := os.Mkdir(initialDir, 0777); err != nil { + t.Fatal(err) + } + + for i := 0; i < 100; i++ { + f.WriteFile(f.JoinPath(initialDir, fmt.Sprintf("%d", i)), "initial data") + } + + f.watch(watched) + f.fsync() + if len(f.events) != 0 { + t.Fatalf("expected 0 initial events; got %d events: %v", len(f.events), f.events) + } + f.events = nil + + // inplace + inplace := f.JoinPath(watched, "inplace") + if err := os.Mkdir(inplace, 0777); err != nil { + t.Fatal(err) + } + f.WriteFile(f.JoinPath(inplace, "inplace.txt"), "inplace data") + + inplaceDir := f.JoinPath(inplace, "inplace_dir") + if err := os.Mkdir(inplaceDir, 0777); err != nil { + t.Fatal(err) + } + + for i := 0; i < 100; i++ { + f.WriteFile(f.JoinPath(inplaceDir, fmt.Sprintf("%d", i)), "inplace data") + } + + f.fsync() + if len(f.events) < 100 { + t.Fatalf("expected >100 inplace events; got %d events: %v", len(f.events), f.events) + } + f.events = nil + + // staged + staged := f.TempDir("staged") + f.WriteFile(f.JoinPath(staged, "staged.txt"), "staged data") + + stagedDir := f.JoinPath(staged, "staged_dir") + if err := os.Mkdir(stagedDir, 0777); err != nil { + t.Fatal(err) + } + + for i := 0; i < 100; i++ { + f.WriteFile(f.JoinPath(stagedDir, fmt.Sprintf("%d", i)), "staged data") + } + + if err := os.Rename(staged, f.JoinPath(watched, "staged")); err != nil { + t.Fatal(err) + } + + f.fsync() + if len(f.events) < 100 { + t.Fatalf("expected >100 staged events; got %d events: %v", len(f.events), f.events) + } + f.events = nil + + pid := os.Getpid() + + output, err := exec.Command("bash", "-c", fmt.Sprintf( + "find /proc/%d/fd -lname anon_inode:inotify -printf '%%hinfo/%%f\n' | xargs cat | grep -c '^inotify'", pid)).Output() + if err != nil { + t.Fatalf("error running command to determine number of watched files: %v", err) + } + + n, err := strconv.Atoi(strings.TrimSpace(string(output))) + if err != nil { + t.Fatalf("couldn't parse number of watched files: %v", err) + } + + if n > 10 { + t.Fatalf("watching more than 10 files: %d", n) + } +}