tilt: copy watch code from tesseract

This commit is contained in:
Matt Landis 2018-08-16 14:53:47 -04:00 committed by Nicolas De loof
parent f24d3458c6
commit a755c84ea0
7 changed files with 883 additions and 0 deletions

27
pkg/watch/features.go Normal file
View File

@ -0,0 +1,27 @@
package watch
import (
"os"
"strconv"
)
const CheckLimitKey = "WM_CHECK_LIMIT"
var limitChecksEnabled = true
// Allows limit checks to be disabled for testing.
func SetLimitChecksEnabled(enabled bool) {
limitChecksEnabled = enabled
}
func LimitChecksEnabled() bool {
env, ok := os.LookupEnv(CheckLimitKey)
if ok {
enabled, err := strconv.ParseBool(env)
if err == nil {
return enabled
}
}
return limitChecksEnabled
}

10
pkg/watch/notify.go Normal file
View File

@ -0,0 +1,10 @@
package watch
import "github.com/windmilleng/fsnotify"
type Notify interface {
Close() error
Add(name string) error
Events() chan fsnotify.Event
Errors() chan error
}

420
pkg/watch/notify_test.go Normal file
View File

@ -0,0 +1,420 @@
package watch
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
"github.com/windmilleng/fsnotify"
)
// Each implementation of the notify interface should have the same basic
// behavior.
func TestNoEvents(t *testing.T) {
f := newNotifyFixture(t)
defer f.tearDown()
f.fsync()
f.assertEvents()
}
func TestEventOrdering(t *testing.T) {
f := newNotifyFixture(t)
defer f.tearDown()
count := 8
dirs := make([]string, count)
for i, _ := range dirs {
dir, err := f.root.NewDir("watched")
if err != nil {
t.Fatal(err)
}
dirs[i] = dir.Path()
err = f.notify.Add(dir.Path())
if err != nil {
t.Fatal(err)
}
}
f.fsync()
f.events = nil
var expected []fsnotify.Event
for i, dir := range dirs {
base := fmt.Sprintf("%d.txt", i)
p := filepath.Join(dir, base)
err := ioutil.WriteFile(p, []byte(base), os.FileMode(0777))
if err != nil {
t.Fatal(err)
}
expected = append(expected, create(filepath.Join(dir, base)))
}
f.fsync()
f.filterJustCreateEvents()
f.assertEvents(expected...)
// Check to make sure that the files appeared in the right order.
createEvents := make([]fsnotify.Event, 0, count)
for _, e := range f.events {
if e.Op == fsnotify.Create {
createEvents = append(createEvents, e)
}
}
if len(createEvents) != count {
t.Fatalf("Expected %d create events. Actual: %+v", count, createEvents)
}
for i, event := range createEvents {
base := fmt.Sprintf("%d.txt", i)
p := filepath.Join(dirs[i], base)
if event.Name != p {
t.Fatalf("Expected event %q at %d. Actual: %+v", base, i, createEvents)
}
}
}
func TestWatchesAreRecursive(t *testing.T) {
f := newNotifyFixture(t)
defer f.tearDown()
root, err := f.root.NewDir("root")
if err != nil {
t.Fatal(err)
}
// add a sub directory
subPath := filepath.Join(root.Path(), "sub")
os.MkdirAll(subPath, os.ModePerm)
// watch parent
err = f.notify.Add(root.Path())
if err != nil {
t.Fatal(err)
}
f.fsync()
f.events = nil
// change sub directory
changeFilePath := filepath.Join(subPath, "change")
_, err = os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666)
if err != nil {
t.Fatal(err)
}
// we should get notified
f.fsync()
f.assertEvents(create(changeFilePath))
}
func TestNewDirectoriesAreRecursivelyWatched(t *testing.T) {
f := newNotifyFixture(t)
defer f.tearDown()
root, err := f.root.NewDir("root")
if err != nil {
t.Fatal(err)
}
// watch parent
err = f.notify.Add(root.Path())
if err != nil {
t.Fatal(err)
}
f.fsync()
f.events = nil
// add a sub directory
subPath := filepath.Join(root.Path(), "sub")
os.MkdirAll(subPath, os.ModePerm)
// change something inside sub directory
changeFilePath := filepath.Join(subPath, "change")
_, err = os.OpenFile(changeFilePath, os.O_RDONLY|os.O_CREATE, 0666)
if err != nil {
t.Fatal(err)
}
// we should get notified
f.fsync()
// assert events
f.assertEvents(create(subPath), create(changeFilePath))
}
func TestWatchNonExistentPath(t *testing.T) {
f := newNotifyFixture(t)
defer f.tearDown()
root, err := f.root.NewDir("root")
if err != nil {
t.Fatal(err)
}
path := filepath.Join(root.Path(), "change")
err = f.notify.Add(path)
if err != nil {
t.Fatal(err)
}
d1 := []byte("hello\ngo\n")
err = ioutil.WriteFile(path, d1, 0644)
if err != nil {
t.Fatal(err)
}
f.fsync()
if runtime.GOOS == "darwin" {
f.assertEvents(create(path))
} else {
f.assertEvents(create(path), write(path))
}
}
func TestRemove(t *testing.T) {
f := newNotifyFixture(t)
defer f.tearDown()
root, err := f.root.NewDir("root")
if err != nil {
t.Fatal(err)
}
path := filepath.Join(root.Path(), "change")
if err != nil {
t.Fatal(err)
}
d1 := []byte("hello\ngo\n")
err = ioutil.WriteFile(path, d1, 0644)
if err != nil {
t.Fatal(err)
}
err = f.notify.Add(path)
if err != nil {
t.Fatal(err)
}
f.fsync()
f.events = nil
err = os.Remove(path)
if err != nil {
t.Fatal(err)
}
f.fsync()
f.assertEvents(remove(path))
}
func TestRemoveAndAddBack(t *testing.T) {
t.Skip("Skipping broken test for now")
f := newNotifyFixture(t)
defer f.tearDown()
root, err := f.root.NewDir("root")
if err != nil {
t.Fatal(err)
}
path := filepath.Join(root.Path(), "change")
if err != nil {
t.Fatal(err)
}
d1 := []byte("hello\ngo\n")
err = ioutil.WriteFile(path, d1, 0644)
if err != nil {
t.Fatal(err)
}
err = f.notify.Add(path)
if err != nil {
t.Fatal(err)
}
err = os.Remove(path)
if err != nil {
t.Fatal(err)
}
f.fsync()
f.assertEvents(remove(path))
f.events = nil
err = ioutil.WriteFile(path, d1, 0644)
if err != nil {
t.Fatal(err)
}
f.assertEvents(create(path))
}
func TestSingleFile(t *testing.T) {
if runtime.GOOS != "darwin" {
t.Skip("Broken on Linux")
}
f := newNotifyFixture(t)
defer f.tearDown()
root, err := f.root.NewDir("root")
if err != nil {
t.Fatal(err)
}
path := filepath.Join(root.Path(), "change")
if err != nil {
t.Fatal(err)
}
d1 := []byte("hello\ngo\n")
err = ioutil.WriteFile(path, d1, 0644)
if err != nil {
t.Fatal(err)
}
err = f.notify.Add(path)
if err != nil {
t.Fatal(err)
}
d2 := []byte("hello\nworld\n")
err = ioutil.WriteFile(path, d2, 0644)
if err != nil {
t.Fatal(err)
}
f.fsync()
f.assertEvents(create(path))
}
type notifyFixture struct {
t *testing.T
root *TempDir
watched *TempDir
notify Notify
events []fsnotify.Event
}
func newNotifyFixture(t *testing.T) *notifyFixture {
SetLimitChecksEnabled(false)
notify, err := NewWatcher()
if err != nil {
t.Fatal(err)
}
root, err := NewDir(t.Name())
if err != nil {
t.Fatal(err)
}
watched, err := root.NewDir("watched")
if err != nil {
t.Fatal(err)
}
err = notify.Add(watched.Path())
if err != nil {
t.Fatal(err)
}
return &notifyFixture{
t: t,
root: root,
watched: watched,
notify: notify,
}
}
func (f *notifyFixture) filterJustCreateEvents() {
var r []fsnotify.Event
for _, ev := range f.events {
if ev.Op != fsnotify.Create {
continue
}
r = append(r, ev)
}
f.events = r
}
func (f *notifyFixture) assertEvents(expected ...fsnotify.Event) {
if len(f.events) != len(expected) {
f.t.Fatalf("Got %d events (expected %d): %v %v", len(f.events), len(expected), f.events, expected)
}
for i, actual := range f.events {
if actual != expected[i] {
f.t.Fatalf("Got event %v (expected %v)", actual, expected[i])
}
}
}
func create(f string) fsnotify.Event {
return fsnotify.Event{
Name: f,
Op: fsnotify.Create,
}
}
func write(f string) fsnotify.Event {
return fsnotify.Event{
Name: f,
Op: fsnotify.Write,
}
}
func remove(f string) fsnotify.Event {
return fsnotify.Event{
Name: f,
Op: fsnotify.Remove,
}
}
func (f *notifyFixture) fsync() {
syncPathBase := fmt.Sprintf("sync-%d.txt", time.Now().UnixNano())
syncPath := filepath.Join(f.watched.Path(), syncPathBase)
anySyncPath := filepath.Join(f.watched.Path(), "sync-")
timeout := time.After(time.Second)
err := ioutil.WriteFile(syncPath, []byte(fmt.Sprintf("%s", time.Now())), os.FileMode(0777))
if err != nil {
f.t.Fatal(err)
}
F:
for {
select {
case err := <-f.notify.Errors():
f.t.Fatal(err)
case event := <-f.notify.Events():
if strings.Contains(event.Name, syncPath) {
break F
}
if strings.Contains(event.Name, anySyncPath) {
continue
}
f.events = append(f.events, event)
case <-timeout:
f.t.Fatalf("fsync: timeout")
}
}
if err != nil {
f.t.Fatal(err)
}
}
func (f *notifyFixture) tearDown() {
SetLimitChecksEnabled(true)
err := f.root.TearDown()
if err != nil {
f.t.Fatal(err)
}
err = f.notify.Close()
if err != nil {
f.t.Fatal(err)
}
}

24
pkg/watch/ospath.go Normal file
View File

@ -0,0 +1,24 @@
package watch
import (
"os"
"path/filepath"
"strings"
)
func pathIsChildOf(path string, parent string) bool {
relPath, err := filepath.Rel(parent, path)
if err != nil {
return true
}
if relPath == "." {
return true
}
if filepath.IsAbs(relPath) || strings.HasPrefix(relPath, ".."+string(os.PathSeparator)) {
return false
}
return true
}

73
pkg/watch/temp.go Normal file
View File

@ -0,0 +1,73 @@
package watch
import (
"io/ioutil"
"os"
"path/filepath"
)
// TempDir holds a temp directory and allows easy access to new temp directories.
type TempDir struct {
dir string
}
// NewDir creates a new TempDir in the default location (typically $TMPDIR)
func NewDir(prefix string) (*TempDir, error) {
return NewDirAtRoot("", prefix)
}
// NewDir creates a new TempDir at the given root.
func NewDirAtRoot(root, prefix string) (*TempDir, error) {
tmpDir, err := ioutil.TempDir(root, prefix)
if err != nil {
return nil, err
}
realTmpDir, err := filepath.EvalSymlinks(tmpDir)
if err != nil {
return nil, err
}
return &TempDir{dir: realTmpDir}, nil
}
// NewDirAtSlashTmp creates a new TempDir at /tmp
func NewDirAtSlashTmp(prefix string) (*TempDir, error) {
fullyResolvedPath, err := filepath.EvalSymlinks("/tmp")
if err != nil {
return nil, err
}
return NewDirAtRoot(fullyResolvedPath, prefix)
}
// d.NewDir creates a new TempDir under d
func (d *TempDir) NewDir(prefix string) (*TempDir, error) {
d2, err := ioutil.TempDir(d.dir, prefix)
if err != nil {
return nil, err
}
return &TempDir{d2}, nil
}
func (d *TempDir) NewDeterministicDir(name string) (*TempDir, error) {
d2 := filepath.Join(d.dir, name)
err := os.Mkdir(d2, 0700)
if os.IsExist(err) {
return nil, err
} else if err != nil {
return nil, err
}
return &TempDir{d2}, nil
}
func (d *TempDir) TearDown() error {
return os.RemoveAll(d.dir)
}
func (d *TempDir) Path() string {
return d.dir
}
// Possible extensions:
// temp file
// named directories or files (e.g., we know we want one git repo for our object, but it should be temporary)

156
pkg/watch/watcher_darwin.go Normal file
View File

@ -0,0 +1,156 @@
package watch
import (
"path/filepath"
"sync"
"time"
"github.com/windmilleng/fsevents"
"github.com/windmilleng/fsnotify"
)
type darwinNotify struct {
stream *fsevents.EventStream
events chan fsnotify.Event
errors chan error
stop chan struct{}
// TODO(nick): This mutex is needed for the case where we add paths after we
// start watching. But because fsevents supports recursive watches, we don't
// actually need this feature. We should change the api contract of wmNotify
// so that, for recursive watches, we can guarantee that the path list doesn't
// change.
sm *sync.Mutex
}
func (d *darwinNotify) isTrackingPath(path string) bool {
d.sm.Lock()
defer d.sm.Unlock()
for _, p := range d.stream.Paths {
if p == path {
return true
}
}
return false
}
func (d *darwinNotify) loop() {
lastCreate := ""
ignoredSpuriousEvent := false
for {
select {
case <-d.stop:
return
case events, ok := <-d.stream.Events:
if !ok {
return
}
for _, e := range events {
e.Path = filepath.Join("/", e.Path)
op := eventFlagsToOp(e.Flags)
// Sometimes we get duplicate CREATE events.
//
// This is exercised by TestEventOrdering, which creates lots of files
// and generates duplicate CREATE events for some of them.
if op == fsnotify.Create {
if lastCreate == e.Path {
continue
}
lastCreate = e.Path
}
// ignore the first event that says the watched directory
// has been created. these are fired spuriously on initiation.
if op == fsnotify.Create {
if d.isTrackingPath(e.Path) && !ignoredSpuriousEvent {
ignoredSpuriousEvent = true
continue
}
}
d.events <- fsnotify.Event{
Name: e.Path,
Op: op,
}
}
}
}
}
func (d *darwinNotify) Add(name string) error {
d.sm.Lock()
defer d.sm.Unlock()
es := d.stream
// Check if this is a subdirectory of any of the paths
// we're already watching.
for _, parent := range es.Paths {
isChild := pathIsChildOf(name, parent)
if isChild {
return nil
}
}
es.Paths = append(es.Paths, name)
if len(es.Paths) == 1 {
go d.loop()
es.Start()
} else {
es.Restart()
}
return nil
}
func (d *darwinNotify) Close() error {
d.sm.Lock()
defer d.sm.Unlock()
d.stream.Stop()
close(d.errors)
close(d.stop)
return nil
}
func (d *darwinNotify) Events() chan fsnotify.Event {
return d.events
}
func (d *darwinNotify) Errors() chan error {
return d.errors
}
func NewWatcher() (Notify, error) {
dw := &darwinNotify{
stream: &fsevents.EventStream{
Latency: 1 * time.Millisecond,
Flags: fsevents.FileEvents,
},
sm: &sync.Mutex{},
events: make(chan fsnotify.Event),
errors: make(chan error),
stop: make(chan struct{}),
}
return dw, nil
}
func eventFlagsToOp(flags fsevents.EventFlags) fsnotify.Op {
if flags&fsevents.ItemRemoved != 0 {
return fsnotify.Remove
}
if flags&fsevents.ItemRenamed != 0 {
return fsnotify.Rename
}
if flags&fsevents.ItemChangeOwner != 0 {
return fsnotify.Chmod
}
if flags&fsevents.ItemCreated != 0 {
return fsnotify.Create
}
return fsnotify.Write
}

173
pkg/watch/watcher_linux.go Normal file
View File

@ -0,0 +1,173 @@
package watch
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/windmilleng/fsnotify"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
const enospc = "no space left on device"
const inotifyErrMsg = "The user limit on the total number of inotify watches was reached; increase the fs.inotify.max_user_watches sysctl. See here for more information: https://facebook.github.io/watchman/docs/install.html#linux-inotify-limits"
const inotifyMin = 8192
type linuxNotify struct {
watcher *fsnotify.Watcher
events chan fsnotify.Event
wrappedEvents chan fsnotify.Event
errors chan error
watchList map[string]bool
}
func (d *linuxNotify) Add(name string) error {
fi, err := os.Stat(name)
if err != nil && !os.IsNotExist(err) {
return err
}
// if it's a file that doesn't exist watch it's parent
if os.IsNotExist(err) {
parent := filepath.Join(name, "..")
err = d.watcher.Add(parent)
if err != nil {
return err
}
d.watchList[parent] = true
} else if fi.IsDir() {
err = d.watchRecursively(name)
if err != nil {
return err
}
d.watchList[name] = true
} else {
err = d.watcher.Add(name)
if err != nil {
return err
}
d.watchList[name] = true
}
return nil
}
func (d *linuxNotify) watchRecursively(dir string) error {
return filepath.Walk(dir, func(path string, mode os.FileInfo, err error) error {
if err != nil {
return err
}
return d.watcher.Add(path)
})
}
func (d *linuxNotify) Close() error {
return d.watcher.Close()
}
func (d *linuxNotify) Events() chan fsnotify.Event {
return d.wrappedEvents
}
func (d *linuxNotify) Errors() chan error {
return d.errors
}
func (d *linuxNotify) loop() {
for e := range d.events {
if e.Op&fsnotify.Create == fsnotify.Create && isDir(e.Name) {
err := filepath.Walk(e.Name, func(path string, mode os.FileInfo, err error) error {
if err != nil {
return err
}
newE := fsnotify.Event{
Op: fsnotify.Create,
Name: path,
}
d.sendEventIfWatched(newE)
// TODO(dmiller): symlinks 😭
err = d.Add(path)
if err != nil {
log.Printf("Error watching path %s: %s", e.Name, err)
}
return nil
})
if err != nil {
log.Printf("Error walking directory %s: %s", e.Name, err)
}
} else {
d.sendEventIfWatched(e)
}
}
}
func (d *linuxNotify) sendEventIfWatched(e fsnotify.Event) {
if _, ok := d.watchList[e.Name]; ok {
d.wrappedEvents <- e
} else {
// TODO(dmiller): maybe use a prefix tree here?
for path := range d.watchList {
if pathIsChildOf(e.Name, path) {
d.wrappedEvents <- e
break
}
}
}
}
func NewWatcher() (*linuxNotify, error) {
fsw, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
wrappedEvents := make(chan fsnotify.Event)
wmw := &linuxNotify{
watcher: fsw,
events: fsw.Events,
wrappedEvents: wrappedEvents,
errors: fsw.Errors,
watchList: map[string]bool{},
}
go wmw.loop()
return wmw, nil
}
func isDir(pth string) bool {
fi, _ := os.Stat(pth)
return fi.IsDir()
}
func checkInotifyLimits() error {
if !LimitChecksEnabled() {
return nil
}
data, err := ioutil.ReadFile("/proc/sys/fs/inotify/max_user_watches")
if err != nil {
return err
}
i, err := strconv.Atoi(strings.TrimSpace(string(data)))
if err != nil {
return err
}
if i < inotifyMin {
return grpc.Errorf(
codes.ResourceExhausted,
"The user limit on the total number of inotify watches is too low (%d); increase the fs.inotify.max_user_watches sysctl. See here for more information: https://facebook.github.io/watchman/docs/install.html#linux-inotify-limits",
i,
)
}
return nil
}