diff --git a/pkg/watch/features.go b/pkg/watch/features.go new file mode 100644 index 000000000..efc8b890b --- /dev/null +++ b/pkg/watch/features.go @@ -0,0 +1,27 @@ +package watch + +import ( + "os" + "strconv" +) + +const CheckLimitKey = "WM_CHECK_LIMIT" + +var limitChecksEnabled = true + +// Allows limit checks to be disabled for testing. +func SetLimitChecksEnabled(enabled bool) { + limitChecksEnabled = enabled +} + +func LimitChecksEnabled() bool { + env, ok := os.LookupEnv(CheckLimitKey) + if ok { + enabled, err := strconv.ParseBool(env) + if err == nil { + return enabled + } + } + + return limitChecksEnabled +} diff --git a/pkg/watch/notify.go b/pkg/watch/notify.go new file mode 100644 index 000000000..efc024139 --- /dev/null +++ b/pkg/watch/notify.go @@ -0,0 +1,10 @@ +package watch + +import "github.com/windmilleng/fsnotify" + +type Notify interface { + Close() error + Add(name string) error + Events() chan fsnotify.Event + Errors() chan error +} diff --git a/pkg/watch/notify_test.go b/pkg/watch/notify_test.go new file mode 100644 index 000000000..b38e72cd7 --- /dev/null +++ b/pkg/watch/notify_test.go @@ -0,0 +1,420 @@ +package watch + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strings" + "testing" + "time" + + "github.com/windmilleng/fsnotify" +) + +// Each implementation of the notify interface should have the same basic +// behavior. + +func TestNoEvents(t *testing.T) { + f := newNotifyFixture(t) + defer f.tearDown() + f.fsync() + f.assertEvents() +} + +func TestEventOrdering(t *testing.T) { + f := newNotifyFixture(t) + defer f.tearDown() + + count := 8 + dirs := make([]string, count) + for i, _ := range dirs { + dir, err := f.root.NewDir("watched") + if err != nil { + t.Fatal(err) + } + dirs[i] = dir.Path() + err = f.notify.Add(dir.Path()) + if err != nil { + t.Fatal(err) + } + } + + f.fsync() + f.events = nil + + var expected []fsnotify.Event + for i, dir := range dirs { + base := fmt.Sprintf("%d.txt", i) + p := filepath.Join(dir, base) + err := ioutil.WriteFile(p, []byte(base), os.FileMode(0777)) + if err != nil { + t.Fatal(err) + } + expected = append(expected, create(filepath.Join(dir, base))) + } + + f.fsync() + + f.filterJustCreateEvents() + f.assertEvents(expected...) + + // Check to make sure that the files appeared in the right order. + createEvents := make([]fsnotify.Event, 0, count) + for _, e := range f.events { + if e.Op == fsnotify.Create { + createEvents = append(createEvents, e) + } + } + + if len(createEvents) != count { + t.Fatalf("Expected %d create events. Actual: %+v", count, createEvents) + } + + for i, event := range createEvents { + base := fmt.Sprintf("%d.txt", i) + p := filepath.Join(dirs[i], base) + if event.Name != p { + t.Fatalf("Expected event %q at %d. Actual: %+v", base, i, createEvents) + } + } +} + +func TestWatchesAreRecursive(t *testing.T) { + f := newNotifyFixture(t) + defer f.tearDown() + + root, err := f.root.NewDir("root") + if err != nil { + t.Fatal(err) + } + + // add a sub directory + subPath := filepath.Join(root.Path(), "sub") + os.MkdirAll(subPath, os.ModePerm) + + // watch parent + err = f.notify.Add(root.Path()) + if err != nil { + t.Fatal(err) + } + + f.fsync() + f.events = nil + // change sub directory + changeFilePath := filepath.Join(subPath, "change") + _, err = os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666) + if err != nil { + t.Fatal(err) + } + + // we should get notified + f.fsync() + + f.assertEvents(create(changeFilePath)) +} + +func TestNewDirectoriesAreRecursivelyWatched(t *testing.T) { + f := newNotifyFixture(t) + defer f.tearDown() + + root, err := f.root.NewDir("root") + if err != nil { + t.Fatal(err) + } + + // watch parent + err = f.notify.Add(root.Path()) + if err != nil { + t.Fatal(err) + } + f.fsync() + f.events = nil + // add a sub directory + subPath := filepath.Join(root.Path(), "sub") + os.MkdirAll(subPath, os.ModePerm) + // change something inside sub directory + changeFilePath := filepath.Join(subPath, "change") + _, err = os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666) + if err != nil { + t.Fatal(err) + } + // we should get notified + f.fsync() + // assert events + f.assertEvents(create(subPath), create(changeFilePath)) +} + +func TestWatchNonExistentPath(t *testing.T) { + f := newNotifyFixture(t) + defer f.tearDown() + + root, err := f.root.NewDir("root") + if err != nil { + t.Fatal(err) + } + + path := filepath.Join(root.Path(), "change") + + err = f.notify.Add(path) + if err != nil { + t.Fatal(err) + } + d1 := []byte("hello\ngo\n") + err = ioutil.WriteFile(path, d1, 0644) + if err != nil { + t.Fatal(err) + } + f.fsync() + if runtime.GOOS == "darwin" { + f.assertEvents(create(path)) + } else { + f.assertEvents(create(path), write(path)) + } +} + +func TestRemove(t *testing.T) { + f := newNotifyFixture(t) + defer f.tearDown() + + root, err := f.root.NewDir("root") + if err != nil { + t.Fatal(err) + } + + path := filepath.Join(root.Path(), "change") + + if err != nil { + t.Fatal(err) + } + d1 := []byte("hello\ngo\n") + err = ioutil.WriteFile(path, d1, 0644) + if err != nil { + t.Fatal(err) + } + err = f.notify.Add(path) + if err != nil { + t.Fatal(err) + } + f.fsync() + f.events = nil + err = os.Remove(path) + if err != nil { + t.Fatal(err) + } + f.fsync() + + f.assertEvents(remove(path)) +} + +func TestRemoveAndAddBack(t *testing.T) { + t.Skip("Skipping broken test for now") + f := newNotifyFixture(t) + defer f.tearDown() + + root, err := f.root.NewDir("root") + if err != nil { + t.Fatal(err) + } + + path := filepath.Join(root.Path(), "change") + + if err != nil { + t.Fatal(err) + } + d1 := []byte("hello\ngo\n") + err = ioutil.WriteFile(path, d1, 0644) + if err != nil { + t.Fatal(err) + } + err = f.notify.Add(path) + if err != nil { + t.Fatal(err) + } + err = os.Remove(path) + if err != nil { + t.Fatal(err) + } + f.fsync() + + f.assertEvents(remove(path)) + f.events = nil + + err = ioutil.WriteFile(path, d1, 0644) + if err != nil { + t.Fatal(err) + } + + f.assertEvents(create(path)) +} + +func TestSingleFile(t *testing.T) { + if runtime.GOOS != "darwin" { + t.Skip("Broken on Linux") + } + f := newNotifyFixture(t) + defer f.tearDown() + + root, err := f.root.NewDir("root") + if err != nil { + t.Fatal(err) + } + + path := filepath.Join(root.Path(), "change") + + if err != nil { + t.Fatal(err) + } + d1 := []byte("hello\ngo\n") + err = ioutil.WriteFile(path, d1, 0644) + if err != nil { + t.Fatal(err) + } + + err = f.notify.Add(path) + if err != nil { + t.Fatal(err) + } + + d2 := []byte("hello\nworld\n") + err = ioutil.WriteFile(path, d2, 0644) + if err != nil { + t.Fatal(err) + } + f.fsync() + + f.assertEvents(create(path)) +} + +type notifyFixture struct { + t *testing.T + root *TempDir + watched *TempDir + notify Notify + events []fsnotify.Event +} + +func newNotifyFixture(t *testing.T) *notifyFixture { + SetLimitChecksEnabled(false) + notify, err := NewWatcher() + if err != nil { + t.Fatal(err) + } + + root, err := NewDir(t.Name()) + if err != nil { + t.Fatal(err) + } + + watched, err := root.NewDir("watched") + if err != nil { + t.Fatal(err) + } + + err = notify.Add(watched.Path()) + if err != nil { + t.Fatal(err) + } + return ¬ifyFixture{ + t: t, + root: root, + watched: watched, + notify: notify, + } +} + +func (f *notifyFixture) filterJustCreateEvents() { + var r []fsnotify.Event + + for _, ev := range f.events { + if ev.Op != fsnotify.Create { + continue + } + r = append(r, ev) + } + + f.events = r +} + +func (f *notifyFixture) assertEvents(expected ...fsnotify.Event) { + if len(f.events) != len(expected) { + f.t.Fatalf("Got %d events (expected %d): %v %v", len(f.events), len(expected), f.events, expected) + } + + for i, actual := range f.events { + if actual != expected[i] { + f.t.Fatalf("Got event %v (expected %v)", actual, expected[i]) + } + } +} + +func create(f string) fsnotify.Event { + return fsnotify.Event{ + Name: f, + Op: fsnotify.Create, + } +} + +func write(f string) fsnotify.Event { + return fsnotify.Event{ + Name: f, + Op: fsnotify.Write, + } +} + +func remove(f string) fsnotify.Event { + return fsnotify.Event{ + Name: f, + Op: fsnotify.Remove, + } +} + +func (f *notifyFixture) fsync() { + syncPathBase := fmt.Sprintf("sync-%d.txt", time.Now().UnixNano()) + syncPath := filepath.Join(f.watched.Path(), syncPathBase) + anySyncPath := filepath.Join(f.watched.Path(), "sync-") + timeout := time.After(time.Second) + + err := ioutil.WriteFile(syncPath, []byte(fmt.Sprintf("%s", time.Now())), os.FileMode(0777)) + if err != nil { + f.t.Fatal(err) + } + +F: + for { + select { + case err := <-f.notify.Errors(): + f.t.Fatal(err) + + case event := <-f.notify.Events(): + if strings.Contains(event.Name, syncPath) { + break F + } + if strings.Contains(event.Name, anySyncPath) { + continue + } + f.events = append(f.events, event) + + case <-timeout: + f.t.Fatalf("fsync: timeout") + } + } + + if err != nil { + f.t.Fatal(err) + } +} + +func (f *notifyFixture) tearDown() { + SetLimitChecksEnabled(true) + err := f.root.TearDown() + if err != nil { + f.t.Fatal(err) + } + + err = f.notify.Close() + if err != nil { + f.t.Fatal(err) + } +} diff --git a/pkg/watch/ospath.go b/pkg/watch/ospath.go new file mode 100644 index 000000000..e80b6dd2d --- /dev/null +++ b/pkg/watch/ospath.go @@ -0,0 +1,24 @@ +package watch + +import ( + "os" + "path/filepath" + "strings" +) + +func pathIsChildOf(path string, parent string) bool { + relPath, err := filepath.Rel(parent, path) + if err != nil { + return true + } + + if relPath == "." { + return true + } + + if filepath.IsAbs(relPath) || strings.HasPrefix(relPath, ".."+string(os.PathSeparator)) { + return false + } + + return true +} diff --git a/pkg/watch/temp.go b/pkg/watch/temp.go new file mode 100644 index 000000000..1573edf94 --- /dev/null +++ b/pkg/watch/temp.go @@ -0,0 +1,73 @@ +package watch + +import ( + "io/ioutil" + "os" + "path/filepath" +) + +// TempDir holds a temp directory and allows easy access to new temp directories. +type TempDir struct { + dir string +} + +// NewDir creates a new TempDir in the default location (typically $TMPDIR) +func NewDir(prefix string) (*TempDir, error) { + return NewDirAtRoot("", prefix) +} + +// NewDir creates a new TempDir at the given root. +func NewDirAtRoot(root, prefix string) (*TempDir, error) { + tmpDir, err := ioutil.TempDir(root, prefix) + if err != nil { + return nil, err + } + + realTmpDir, err := filepath.EvalSymlinks(tmpDir) + if err != nil { + return nil, err + } + + return &TempDir{dir: realTmpDir}, nil +} + +// NewDirAtSlashTmp creates a new TempDir at /tmp +func NewDirAtSlashTmp(prefix string) (*TempDir, error) { + fullyResolvedPath, err := filepath.EvalSymlinks("/tmp") + if err != nil { + return nil, err + } + return NewDirAtRoot(fullyResolvedPath, prefix) +} + +// d.NewDir creates a new TempDir under d +func (d *TempDir) NewDir(prefix string) (*TempDir, error) { + d2, err := ioutil.TempDir(d.dir, prefix) + if err != nil { + return nil, err + } + return &TempDir{d2}, nil +} + +func (d *TempDir) NewDeterministicDir(name string) (*TempDir, error) { + d2 := filepath.Join(d.dir, name) + err := os.Mkdir(d2, 0700) + if os.IsExist(err) { + return nil, err + } else if err != nil { + return nil, err + } + return &TempDir{d2}, nil +} + +func (d *TempDir) TearDown() error { + return os.RemoveAll(d.dir) +} + +func (d *TempDir) Path() string { + return d.dir +} + +// Possible extensions: +// temp file +// named directories or files (e.g., we know we want one git repo for our object, but it should be temporary) diff --git a/pkg/watch/watcher_darwin.go b/pkg/watch/watcher_darwin.go new file mode 100644 index 000000000..624d5ffd6 --- /dev/null +++ b/pkg/watch/watcher_darwin.go @@ -0,0 +1,156 @@ +package watch + +import ( + "path/filepath" + "sync" + "time" + + "github.com/windmilleng/fsevents" + "github.com/windmilleng/fsnotify" +) + +type darwinNotify struct { + stream *fsevents.EventStream + events chan fsnotify.Event + errors chan error + stop chan struct{} + + // TODO(nick): This mutex is needed for the case where we add paths after we + // start watching. But because fsevents supports recursive watches, we don't + // actually need this feature. We should change the api contract of wmNotify + // so that, for recursive watches, we can guarantee that the path list doesn't + // change. + sm *sync.Mutex +} + +func (d *darwinNotify) isTrackingPath(path string) bool { + d.sm.Lock() + defer d.sm.Unlock() + for _, p := range d.stream.Paths { + if p == path { + return true + } + } + return false +} + +func (d *darwinNotify) loop() { + lastCreate := "" + ignoredSpuriousEvent := false + for { + select { + case <-d.stop: + return + case events, ok := <-d.stream.Events: + if !ok { + return + } + + for _, e := range events { + e.Path = filepath.Join("/", e.Path) + op := eventFlagsToOp(e.Flags) + + // Sometimes we get duplicate CREATE events. + // + // This is exercised by TestEventOrdering, which creates lots of files + // and generates duplicate CREATE events for some of them. + if op == fsnotify.Create { + if lastCreate == e.Path { + continue + } + lastCreate = e.Path + } + + // ignore the first event that says the watched directory + // has been created. these are fired spuriously on initiation. + if op == fsnotify.Create { + if d.isTrackingPath(e.Path) && !ignoredSpuriousEvent { + ignoredSpuriousEvent = true + continue + } + } + + d.events <- fsnotify.Event{ + Name: e.Path, + Op: op, + } + } + } + } +} + +func (d *darwinNotify) Add(name string) error { + d.sm.Lock() + defer d.sm.Unlock() + + es := d.stream + + // Check if this is a subdirectory of any of the paths + // we're already watching. + for _, parent := range es.Paths { + isChild := pathIsChildOf(name, parent) + if isChild { + return nil + } + } + + es.Paths = append(es.Paths, name) + if len(es.Paths) == 1 { + go d.loop() + es.Start() + } else { + es.Restart() + } + + return nil +} + +func (d *darwinNotify) Close() error { + d.sm.Lock() + defer d.sm.Unlock() + + d.stream.Stop() + close(d.errors) + close(d.stop) + + return nil +} + +func (d *darwinNotify) Events() chan fsnotify.Event { + return d.events +} + +func (d *darwinNotify) Errors() chan error { + return d.errors +} + +func NewWatcher() (Notify, error) { + dw := &darwinNotify{ + stream: &fsevents.EventStream{ + Latency: 1 * time.Millisecond, + Flags: fsevents.FileEvents, + }, + sm: &sync.Mutex{}, + events: make(chan fsnotify.Event), + errors: make(chan error), + stop: make(chan struct{}), + } + + return dw, nil +} + +func eventFlagsToOp(flags fsevents.EventFlags) fsnotify.Op { + if flags&fsevents.ItemRemoved != 0 { + return fsnotify.Remove + } + if flags&fsevents.ItemRenamed != 0 { + return fsnotify.Rename + } + if flags&fsevents.ItemChangeOwner != 0 { + return fsnotify.Chmod + } + if flags&fsevents.ItemCreated != 0 { + return fsnotify.Create + } + return fsnotify.Write +} diff --git a/pkg/watch/watcher_linux.go b/pkg/watch/watcher_linux.go new file mode 100644 index 000000000..e3fc09910 --- /dev/null +++ b/pkg/watch/watcher_linux.go @@ -0,0 +1,173 @@ +package watch + +import ( + "io/ioutil" + "log" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/windmilleng/fsnotify" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +const enospc = "no space left on device" +const inotifyErrMsg = "The user limit on the total number of inotify watches was reached; increase the fs.inotify.max_user_watches sysctl. See here for more information: https://facebook.github.io/watchman/docs/install.html#linux-inotify-limits" +const inotifyMin = 8192 + +type linuxNotify struct { + watcher *fsnotify.Watcher + events chan fsnotify.Event + wrappedEvents chan fsnotify.Event + errors chan error + watchList map[string]bool +} + +func (d *linuxNotify) Add(name string) error { + fi, err := os.Stat(name) + if err != nil && !os.IsNotExist(err) { + return err + } + // if it's a file that doesn't exist watch it's parent + if os.IsNotExist(err) { + parent := filepath.Join(name, "..") + err = d.watcher.Add(parent) + if err != nil { + return err + } + d.watchList[parent] = true + } else if fi.IsDir() { + err = d.watchRecursively(name) + if err != nil { + return err + } + d.watchList[name] = true + } else { + err = d.watcher.Add(name) + if err != nil { + return err + } + d.watchList[name] = true + } + + return nil +} + +func (d *linuxNotify) watchRecursively(dir string) error { + return filepath.Walk(dir, func(path string, mode os.FileInfo, err error) error { + if err != nil { + return err + } + + return d.watcher.Add(path) + }) +} + +func (d *linuxNotify) Close() error { + return d.watcher.Close() +} + +func (d *linuxNotify) Events() chan fsnotify.Event { + return d.wrappedEvents +} + +func (d *linuxNotify) Errors() chan error { + return d.errors +} + +func (d *linuxNotify) loop() { + for e := range d.events { + if e.Op&fsnotify.Create == fsnotify.Create && isDir(e.Name) { + err := filepath.Walk(e.Name, func(path string, mode os.FileInfo, err error) error { + if err != nil { + return err + } + newE := fsnotify.Event{ + Op: fsnotify.Create, + Name: path, + } + d.sendEventIfWatched(newE) + // TODO(dmiller): symlinks 😭 + err = d.Add(path) + if err != nil { + log.Printf("Error watching path %s: %s", e.Name, err) + } + return nil + }) + if err != nil { + log.Printf("Error walking directory %s: %s", e.Name, err) + } + } else { + d.sendEventIfWatched(e) + } + } +} + +func (d *linuxNotify) sendEventIfWatched(e fsnotify.Event) { + if _, ok := d.watchList[e.Name]; ok { + d.wrappedEvents <- e + } else { + // TODO(dmiller): maybe use a prefix tree here? + for path := range d.watchList { + if pathIsChildOf(e.Name, path) { + d.wrappedEvents <- e + break + } + } + } +} + +func NewWatcher() (*linuxNotify, error) { + fsw, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + wrappedEvents := make(chan fsnotify.Event) + + wmw := &linuxNotify{ + watcher: fsw, + events: fsw.Events, + wrappedEvents: wrappedEvents, + errors: fsw.Errors, + watchList: map[string]bool{}, + } + + go wmw.loop() + + return wmw, nil +} + +func isDir(pth string) bool { + fi, _ := os.Stat(pth) + + return fi.IsDir() +} + +func checkInotifyLimits() error { + if !LimitChecksEnabled() { + return nil + } + + data, err := ioutil.ReadFile("/proc/sys/fs/inotify/max_user_watches") + if err != nil { + return err + } + + i, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil { + return err + } + + if i < inotifyMin { + return grpc.Errorf( + codes.ResourceExhausted, + "The user limit on the total number of inotify watches is too low (%d); increase the fs.inotify.max_user_watches sysctl. See here for more information: https://facebook.github.io/watchman/docs/install.html#linux-inotify-limits", + i, + ) + } + + return nil +}