watch: change the watcher interface to better match how we actually use it (#1835)

This commit is contained in:
Nick Santos 2019-07-11 11:40:40 -04:00 committed by Nicolas De loof
parent b5ccea7b0e
commit 7f6e189dbc
4 changed files with 168 additions and 156 deletions

View File

@ -1,12 +1,49 @@
package watch package watch
import "github.com/windmilleng/tilt/internal/logger"
type FileEvent struct { type FileEvent struct {
Path string Path string
} }
type Notify interface { type Notify interface {
// Start watching the paths set at init time
Start() error
// Stop watching and close all channels
Close() error Close() error
Add(name string) error
// A channel to read off incoming file changes
Events() chan FileEvent Events() chan FileEvent
// A channel to read off show-stopping errors
Errors() chan error Errors() chan error
} }
// When we specify directories to watch, we often want to
// ignore some subset of the files under those directories.
//
// For example:
// - Watch /src/repo, but ignore /src/repo/.git
// - Watch /src/repo, but ignore everything in /src/repo/bazel-bin except /src/repo/bazel-bin/app-binary
//
// The PathMatcher inteface helps us manage these ignores.
// By design, fileutils.PatternMatcher (the interface that implements dockerignore)
// satisfies this interface
// https://godoc.org/github.com/docker/docker/pkg/fileutils#PatternMatcher
type PathMatcher interface {
Matches(file string) (bool, error)
Exclusions() bool
}
type EmptyMatcher struct {
}
func (EmptyMatcher) Matches(f string) (bool, error) { return false, nil }
func (EmptyMatcher) Exclusions() bool { return false }
var _ PathMatcher = EmptyMatcher{}
func NewWatcher(paths []string, ignore PathMatcher, l logger.Logger) (Notify, error) {
return newWatcher(paths, ignore, l)
}

View File

@ -36,10 +36,7 @@ func TestEventOrdering(t *testing.T) {
for i, _ := range dirs { for i, _ := range dirs {
dir := f.TempDir("watched") dir := f.TempDir("watched")
dirs[i] = dir dirs[i] = dir
err := f.notify.Add(dir) f.watch(dir)
if err != nil {
t.Fatal(err)
}
} }
f.fsync() f.fsync()
@ -71,10 +68,7 @@ func TestGitBranchSwitch(t *testing.T) {
for i, _ := range dirs { for i, _ := range dirs {
dir := f.TempDir("watched") dir := f.TempDir("watched")
dirs[i] = dir dirs[i] = dir
err := f.notify.Add(dir) f.watch(dir)
if err != nil {
t.Fatal(err)
}
} }
f.fsync() f.fsync()
@ -129,16 +123,13 @@ func TestWatchesAreRecursive(t *testing.T) {
f.MkdirAll(subPath) f.MkdirAll(subPath)
// watch parent // watch parent
err := f.notify.Add(root) f.watch(root)
if err != nil {
t.Fatal(err)
}
f.fsync() f.fsync()
f.events = nil f.events = nil
// change sub directory // change sub directory
changeFilePath := filepath.Join(subPath, "change") changeFilePath := filepath.Join(subPath, "change")
_, err = os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666) _, err := os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -153,10 +144,7 @@ func TestNewDirectoriesAreRecursivelyWatched(t *testing.T) {
root := f.TempDir("root") root := f.TempDir("root")
// watch parent // watch parent
err := f.notify.Add(root) f.watch(root)
if err != nil {
t.Fatal(err)
}
f.fsync() f.fsync()
f.events = nil f.events = nil
@ -166,7 +154,7 @@ func TestNewDirectoriesAreRecursivelyWatched(t *testing.T) {
// change something inside sub directory // change something inside sub directory
changeFilePath := filepath.Join(subPath, "change") changeFilePath := filepath.Join(subPath, "change")
_, err = os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666) _, err := os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -180,11 +168,7 @@ func TestWatchNonExistentPath(t *testing.T) {
root := f.TempDir("root") root := f.TempDir("root")
path := filepath.Join(root, "change") path := filepath.Join(root, "change")
err := f.notify.Add(path) f.watch(path)
if err != nil {
t.Fatal(err)
}
f.fsync() f.fsync()
d1 := "hello\ngo\n" d1 := "hello\ngo\n"
@ -200,11 +184,7 @@ func TestWatchNonExistentPathDoesNotFireSiblingEvent(t *testing.T) {
watchedFile := filepath.Join(root, "a.txt") watchedFile := filepath.Join(root, "a.txt")
unwatchedSibling := filepath.Join(root, "b.txt") unwatchedSibling := filepath.Join(root, "b.txt")
err := f.notify.Add(watchedFile) f.watch(watchedFile)
if err != nil {
t.Fatal(err)
}
f.fsync() f.fsync()
d1 := "hello\ngo\n" d1 := "hello\ngo\n"
@ -222,13 +202,10 @@ func TestRemove(t *testing.T) {
d1 := "hello\ngo\n" d1 := "hello\ngo\n"
f.WriteFile(path, d1) f.WriteFile(path, d1)
err := f.notify.Add(path) f.watch(path)
if err != nil {
t.Fatal(err)
}
f.fsync() f.fsync()
f.events = nil f.events = nil
err = os.Remove(path) err := os.Remove(path)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -239,17 +216,14 @@ func TestRemoveAndAddBack(t *testing.T) {
f := newNotifyFixture(t) f := newNotifyFixture(t)
defer f.tearDown() defer f.tearDown()
path := filepath.Join(f.watched, "change") path := filepath.Join(f.paths[0], "change")
d1 := []byte("hello\ngo\n") d1 := []byte("hello\ngo\n")
err := ioutil.WriteFile(path, d1, 0644) err := ioutil.WriteFile(path, d1, 0644)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = f.notify.Add(path) f.watch(path)
if err != nil {
t.Fatal(err)
}
f.assertEvents(path) f.assertEvents(path)
err = os.Remove(path) err = os.Remove(path)
@ -278,14 +252,11 @@ func TestSingleFile(t *testing.T) {
d1 := "hello\ngo\n" d1 := "hello\ngo\n"
f.WriteFile(path, d1) f.WriteFile(path, d1)
err := f.notify.Add(path) f.watch(path)
if err != nil {
t.Fatal(err)
}
f.fsync() f.fsync()
d2 := []byte("hello\nworld\n") d2 := []byte("hello\nworld\n")
err = ioutil.WriteFile(path, d2, 0644) err := ioutil.WriteFile(path, d2, 0644)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -296,8 +267,8 @@ func TestWriteBrokenLink(t *testing.T) {
f := newNotifyFixture(t) f := newNotifyFixture(t)
defer f.tearDown() defer f.tearDown()
link := filepath.Join(f.watched, "brokenLink") link := filepath.Join(f.paths[0], "brokenLink")
missingFile := filepath.Join(f.watched, "missingFile") missingFile := filepath.Join(f.paths[0], "missingFile")
err := os.Symlink(missingFile, link) err := os.Symlink(missingFile, link)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -310,13 +281,13 @@ func TestWriteGoodLink(t *testing.T) {
f := newNotifyFixture(t) f := newNotifyFixture(t)
defer f.tearDown() defer f.tearDown()
goodFile := filepath.Join(f.watched, "goodFile") goodFile := filepath.Join(f.paths[0], "goodFile")
err := ioutil.WriteFile(goodFile, []byte("hello"), 0644) err := ioutil.WriteFile(goodFile, []byte("hello"), 0644)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
link := filepath.Join(f.watched, "goodFileSymlink") link := filepath.Join(f.paths[0], "goodFileSymlink")
err = os.Symlink(goodFile, link) err = os.Symlink(goodFile, link)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -342,11 +313,7 @@ func TestWatchBrokenLink(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = f.notify.Add(newRoot.Path()) f.watch(newRoot.Path())
if err != nil {
t.Fatal(err)
}
os.Remove(link) os.Remove(link)
f.assertEvents(link) f.assertEvents(link)
} }
@ -359,15 +326,11 @@ func TestMoveAndReplace(t *testing.T) {
file := filepath.Join(root, "myfile") file := filepath.Join(root, "myfile")
f.WriteFile(file, "hello") f.WriteFile(file, "hello")
err := f.notify.Add(file) f.watch(file)
if err != nil {
t.Fatal(err)
}
tmpFile := filepath.Join(root, ".myfile.swp") tmpFile := filepath.Join(root, ".myfile.swp")
f.WriteFile(tmpFile, "world") f.WriteFile(tmpFile, "world")
err = os.Rename(tmpFile, file) err := os.Rename(tmpFile, file)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -478,37 +441,40 @@ func TestWatchNonexistentDirectory(t *testing.T) {
type notifyFixture struct { type notifyFixture struct {
out *bytes.Buffer out *bytes.Buffer
*tempdir.TempDirFixture *tempdir.TempDirFixture
notify Notify notify Notify
watched string paths []string
events []FileEvent events []FileEvent
} }
func newNotifyFixture(t *testing.T) *notifyFixture { func newNotifyFixture(t *testing.T) *notifyFixture {
out := bytes.NewBuffer(nil) out := bytes.NewBuffer(nil)
notify, err := NewWatcher(logger.NewLogger(logger.DebugLvl, out)) nf := &notifyFixture{
if err != nil { TempDirFixture: tempdir.NewTempDirFixture(t),
t.Fatal(err) paths: []string{},
}
f := tempdir.NewTempDirFixture(t)
watched := f.TempDir("watched")
err = notify.Add(watched)
if err != nil {
t.Fatal(err)
}
return &notifyFixture{
TempDirFixture: f,
watched: watched,
notify: notify,
out: out, out: out,
} }
nf.watch(nf.TempDir("watched"))
return nf
} }
func (f *notifyFixture) watch(path string) { func (f *notifyFixture) watch(path string) {
err := f.notify.Add(path) f.paths = append(f.paths, path)
// sync any outstanding events and close the old watcher
if f.notify != nil {
f.fsync()
f.closeWatcher()
}
// create a new watcher
notify, err := NewWatcher(f.paths, EmptyMatcher{}, logger.NewLogger(logger.DebugLvl, f.out))
if err != nil { if err != nil {
f.T().Fatalf("notify.Add: %s", path) f.T().Fatal(err)
}
f.notify = notify
err = f.notify.Start()
if err != nil {
f.T().Fatal(err)
} }
} }
@ -548,8 +514,8 @@ func (f *notifyFixture) consumeEventsInBackground(ctx context.Context) chan erro
func (f *notifyFixture) fsync() { func (f *notifyFixture) fsync() {
syncPathBase := fmt.Sprintf("sync-%d.txt", time.Now().UnixNano()) syncPathBase := fmt.Sprintf("sync-%d.txt", time.Now().UnixNano())
syncPath := filepath.Join(f.watched, syncPathBase) syncPath := filepath.Join(f.paths[0], syncPathBase)
anySyncPath := filepath.Join(f.watched, "sync-") anySyncPath := filepath.Join(f.paths[0], "sync-")
timeout := time.After(time.Second) timeout := time.After(time.Second)
f.WriteFile(syncPath, fmt.Sprintf("%s", time.Now())) f.WriteFile(syncPath, fmt.Sprintf("%s", time.Now()))
@ -582,21 +548,25 @@ F:
} }
} }
func (f *notifyFixture) tearDown() { func (f *notifyFixture) closeWatcher() {
err := f.notify.Close() notify := f.notify
err := notify.Close()
if err != nil { if err != nil {
f.T().Fatal(err) f.T().Fatal(err)
} }
// drain channels from watcher // drain channels from watcher
go func() { go func() {
for _ = range f.notify.Events() { for _ = range notify.Events() {
} }
}() }()
go func() { go func() {
for _ = range f.notify.Errors() { for _ = range notify.Errors() {
} }
}() }()
}
func (f *notifyFixture) tearDown() {
f.closeWatcher()
f.TempDirFixture.TearDown() f.TempDirFixture.TearDown()
} }

View File

@ -2,7 +2,6 @@ package watch
import ( import (
"path/filepath" "path/filepath"
"sync"
"time" "time"
"github.com/windmilleng/tilt/internal/logger" "github.com/windmilleng/tilt/internal/logger"
@ -19,14 +18,9 @@ type darwinNotify struct {
errors chan error errors chan error
stop chan struct{} stop chan struct{}
// TODO(nick): This mutex is needed for the case where we add paths after we
// start watching. But because fsevents supports recursive watches, we don't
// actually need this feature. We should change the api contract of wmNotify
// so that, for recursive watches, we can guarantee that the path list doesn't
// change.
sm *sync.Mutex
pathsWereWatching map[string]interface{} pathsWereWatching map[string]interface{}
ignore PathMatcher
logger logger.Logger
sawAnyHistoryDone bool sawAnyHistoryDone bool
} }
@ -44,9 +38,7 @@ func (d *darwinNotify) loop() {
e.Path = filepath.Join("/", e.Path) e.Path = filepath.Join("/", e.Path)
if e.Flags&fsevents.HistoryDone == fsevents.HistoryDone { if e.Flags&fsevents.HistoryDone == fsevents.HistoryDone {
d.sm.Lock()
d.sawAnyHistoryDone = true d.sawAnyHistoryDone = true
d.sm.Unlock()
continue continue
} }
@ -63,6 +55,13 @@ func (d *darwinNotify) loop() {
continue continue
} }
ignore, err := d.ignore.Matches(e.Path)
if err != nil {
d.logger.Infof("Error matching path %q: %v", e.Path, err)
} else if ignore {
continue
}
d.events <- FileEvent{ d.events <- FileEvent{
Path: e.Path, Path: e.Path,
} }
@ -71,41 +70,33 @@ func (d *darwinNotify) loop() {
} }
} }
func (d *darwinNotify) Add(name string) error { // Add a path to be watched. Should only be called during initialization.
d.sm.Lock() func (d *darwinNotify) initAdd(name string) {
defer d.sm.Unlock()
es := d.stream
// Check if this is a subdirectory of any of the paths // Check if this is a subdirectory of any of the paths
// we're already watching. // we're already watching.
for _, parent := range es.Paths { for _, parent := range d.stream.Paths {
if ospath.IsChild(parent, name) { if ospath.IsChild(parent, name) {
return nil return
} }
} }
es.Paths = append(es.Paths, name) d.stream.Paths = append(d.stream.Paths, name)
if d.pathsWereWatching == nil { if d.pathsWereWatching == nil {
d.pathsWereWatching = make(map[string]interface{}) d.pathsWereWatching = make(map[string]interface{})
} }
d.pathsWereWatching[name] = struct{}{} d.pathsWereWatching[name] = struct{}{}
}
if len(es.Paths) == 1 { func (d *darwinNotify) Start() error {
es.Start() d.stream.Start()
go d.loop()
} else { go d.loop()
es.Restart()
}
return nil return nil
} }
func (d *darwinNotify) Close() error { func (d *darwinNotify) Close() error {
d.sm.Lock()
defer d.sm.Unlock()
d.stream.Stop() d.stream.Stop()
close(d.errors) close(d.errors)
close(d.stop) close(d.stop)
@ -121,8 +112,10 @@ func (d *darwinNotify) Errors() chan error {
return d.errors return d.errors
} }
func NewWatcher(l logger.Logger) (Notify, error) { func newWatcher(paths []string, ignore PathMatcher, l logger.Logger) (*darwinNotify, error) {
dw := &darwinNotify{ dw := &darwinNotify{
ignore: ignore,
logger: l,
stream: &fsevents.EventStream{ stream: &fsevents.EventStream{
Latency: 1 * time.Millisecond, Latency: 1 * time.Millisecond,
Flags: fsevents.FileEvents, Flags: fsevents.FileEvents,
@ -130,12 +123,15 @@ func NewWatcher(l logger.Logger) (Notify, error) {
// https://developer.apple.com/documentation/coreservices/1443980-fseventstreamcreate // https://developer.apple.com/documentation/coreservices/1443980-fseventstreamcreate
EventID: fsevents.LatestEventID(), EventID: fsevents.LatestEventID(),
}, },
sm: &sync.Mutex{},
events: make(chan FileEvent), events: make(chan FileEvent),
errors: make(chan error), errors: make(chan error),
stop: make(chan struct{}), stop: make(chan struct{}),
} }
for _, path := range paths {
dw.initAdd(path)
}
return dw, nil return dw, nil
} }

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/windmilleng/fsnotify" "github.com/windmilleng/fsnotify"
@ -21,51 +20,51 @@ import (
// //
// All OS-specific codepaths are handled by fsnotify. // All OS-specific codepaths are handled by fsnotify.
type naiveNotify struct { type naiveNotify struct {
log logger.Logger
watcher *fsnotify.Watcher
events chan fsnotify.Event
wrappedEvents chan FileEvent
errors chan error
mu sync.Mutex
// Paths that we're watching that should be passed up to the caller. // Paths that we're watching that should be passed up to the caller.
// Note that we may have to watch ancestors of these paths // Note that we may have to watch ancestors of these paths
// in order to fulfill the API promise. // in order to fulfill the API promise.
notifyList map[string]bool notifyList map[string]bool
ignore PathMatcher
log logger.Logger
watcher *fsnotify.Watcher
events chan fsnotify.Event
wrappedEvents chan FileEvent
errors chan error
} }
var ( var (
numberOfWatches = expvar.NewInt("watch.naive.numberOfWatches") numberOfWatches = expvar.NewInt("watch.naive.numberOfWatches")
) )
func (d *naiveNotify) Add(name string) error { func (d *naiveNotify) Start() error {
fi, err := os.Stat(name) for name := range d.notifyList {
if err != nil && !os.IsNotExist(err) { fi, err := os.Stat(name)
return errors.Wrapf(err, "notify.Add(%q)", name) if err != nil && !os.IsNotExist(err) {
}
// if it's a file that doesn't exist, watch its parent
if os.IsNotExist(err) {
err = d.watchAncestorOfMissingPath(name)
if err != nil {
return errors.Wrapf(err, "watchAncestorOfMissingPath(%q)", name)
}
} else if fi.IsDir() {
err = d.watchRecursively(name)
if err != nil {
return errors.Wrapf(err, "notify.Add(%q)", name) return errors.Wrapf(err, "notify.Add(%q)", name)
} }
} else {
err = d.add(filepath.Dir(name)) // if it's a file that doesn't exist, watch its parent
if err != nil { if os.IsNotExist(err) {
return errors.Wrapf(err, "notify.Add(%q)", filepath.Dir(name)) err = d.watchAncestorOfMissingPath(name)
if err != nil {
return errors.Wrapf(err, "watchAncestorOfMissingPath(%q)", name)
}
} else if fi.IsDir() {
err = d.watchRecursively(name)
if err != nil {
return errors.Wrapf(err, "notify.Add(%q)", name)
}
} else {
err = d.add(filepath.Dir(name))
if err != nil {
return errors.Wrapf(err, "notify.Add(%q)", filepath.Dir(name))
}
} }
} }
d.mu.Lock() go d.loop()
defer d.mu.Unlock()
d.notifyList[name] = true
return nil return nil
} }
@ -123,10 +122,8 @@ func (d *naiveNotify) Errors() chan error {
func (d *naiveNotify) loop() { func (d *naiveNotify) loop() {
defer close(d.wrappedEvents) defer close(d.wrappedEvents)
for e := range d.events { for e := range d.events {
shouldNotify := d.shouldNotify(e.Name)
if e.Op&fsnotify.Create != fsnotify.Create { if e.Op&fsnotify.Create != fsnotify.Create {
if shouldNotify { if d.shouldNotify(e.Name) {
d.wrappedEvents <- FileEvent{e.Name} d.wrappedEvents <- FileEvent{e.Name}
} }
continue continue
@ -170,8 +167,13 @@ func (d *naiveNotify) loop() {
} }
func (d *naiveNotify) shouldNotify(path string) bool { func (d *naiveNotify) shouldNotify(path string) bool {
d.mu.Lock() ignore, err := d.ignore.Matches(path)
defer d.mu.Unlock() if err != nil {
d.log.Infof("Error matching path %q: %v", path, err)
} else if ignore {
return false
}
if _, ok := d.notifyList[path]; ok { if _, ok := d.notifyList[path]; ok {
return true return true
} }
@ -193,25 +195,32 @@ func (d *naiveNotify) add(path string) error {
return nil return nil
} }
func NewWatcher(l logger.Logger) (*naiveNotify, error) { func newWatcher(paths []string, ignore PathMatcher, l logger.Logger) (*naiveNotify, error) {
if ignore == nil {
return nil, fmt.Errorf("newWatcher: ignore is nil")
}
fsw, err := fsnotify.NewWatcher() fsw, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
return nil, err return nil, err
} }
wrappedEvents := make(chan FileEvent) wrappedEvents := make(chan FileEvent)
notifyList := make(map[string]bool, len(paths))
for _, path := range paths {
notifyList[path] = true
}
wmw := &naiveNotify{ wmw := &naiveNotify{
notifyList: notifyList,
ignore: ignore,
log: l, log: l,
watcher: fsw, watcher: fsw,
events: fsw.Events, events: fsw.Events,
wrappedEvents: wrappedEvents, wrappedEvents: wrappedEvents,
errors: fsw.Errors, errors: fsw.Errors,
notifyList: map[string]bool{},
} }
go wmw.loop()
return wmw, nil return wmw, nil
} }