watch: add file delete/rename handling

This approach mimics Tilt's behavior[^1]:
 1. At sync time, `stat` the path on host
 2. If the path does not exist -> `rm` from container
 3. If the path exists -> sync to container

By handling things this way, we're always syncing based on the true
state, regardless of what's happened in the interim. For example, a
common pattern in POSIX tools is to create a file and then rename it
over an existing file. Based on timing, this could be a sync, delete,
sync (every file gets seen & processed) OR a delete, sync (by the
the time we process the event, the "temp" file is already gone, so
we just delete it from the container, where it never existed, but
that's fine since we deletes are idempotent thanks to the `-f` flag
on `rm`).

Additionally, when syncing, if the `stat` call shows it's for a
directory, we ignore it. Otherwise, duplicate, nested copies of the
entire path could get synced in. (On some OSes, an event for the
directory gets dispatched when a file inside of it is modified. In
practice, I think we might want this pushed further down in the
watching code, but since we're already `stat`ing the paths here now,
it's a good place to handle it.)

Lastly, there's some very light changes to the text when it does a
full rebuild that will list out the (merged) set of paths that
triggered it. We can continue to improve the output, but this is
really helpful for understanding why it's rebuilding.

[^1]: db7f887b06/internal/controllers/core/liveupdate/reconciler.go (L911)

Signed-off-by: Milas Bowman <milas.bowman@docker.com>
This commit is contained in:
Milas Bowman 2023-03-20 13:25:19 -04:00
parent 03f0ed132d
commit 105a7c5b70
2 changed files with 105 additions and 43 deletions

View File

@ -17,6 +17,9 @@ package compose
import (
"context"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"strings"
"time"
@ -50,9 +53,30 @@ type Trigger struct {
const quietPeriod = 2 * time.Second
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint:gocyclo
needRebuild := make(chan string)
needSync := make(chan api.CopyOptions, 5)
// fileMapping contains the Compose service and modified host system path.
//
// For file sync, the container path is also included.
// For rebuild, there is no container path, so it is always empty.
type fileMapping struct {
// service that the file event is for.
service string
// hostPath that was created/modified/deleted outside the container.
//
// This is the path as seen from the user's perspective, e.g.
// - C:\Users\moby\Documents\hello-world\main.go
// - /Users/moby/Documents/hello-world/main.go
hostPath string
// containerPath for the target file inside the container (only populated
// for sync events, not rebuild).
//
// This is the path as used in Docker CLI commands, e.g.
// - /workdir/main.go
containerPath string
}
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint:gocyclo
needRebuild := make(chan fileMapping)
needSync := make(chan fileMapping)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
@ -120,38 +144,37 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
case <-ctx.Done():
return nil
case event := <-watcher.Events():
path := event.Path()
hostPath := event.Path()
for _, trigger := range config.Watch {
logrus.Debugf("change detected on %s - comparing with %s", path, trigger.Path)
if watch.IsChild(trigger.Path, path) {
fmt.Fprintf(s.stderr(), "change detected on %s\n", path)
logrus.Debugf("change detected on %s - comparing with %s", hostPath, trigger.Path)
if watch.IsChild(trigger.Path, hostPath) {
fmt.Fprintf(s.stderr(), "change detected on %s\n", hostPath)
f := fileMapping{
hostPath: hostPath,
service: name,
}
switch trigger.Action {
case WatchActionSync:
logrus.Debugf("modified file %s triggered sync", path)
rel, err := filepath.Rel(trigger.Path, path)
logrus.Debugf("modified file %s triggered sync", hostPath)
rel, err := filepath.Rel(trigger.Path, hostPath)
if err != nil {
return err
}
dest := filepath.Join(trigger.Target, rel)
needSync <- api.CopyOptions{
Source: path,
Destination: fmt.Sprintf("%s:%s", name, dest),
}
// always use Unix-style paths for inside the container
f.containerPath = path.Join(trigger.Target, rel)
needSync <- f
case WatchActionRebuild:
logrus.Debugf("modified file %s requires image to be rebuilt", path)
needRebuild <- name
logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
needRebuild <- f
default:
return fmt.Errorf("watch action %q is not supported", trigger)
}
continue WATCH
}
}
// default
needRebuild <- name
case err := <-watcher.Errors():
return err
}
@ -183,11 +206,25 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
return config, nil
}
func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services []string) {
return func(services []string) {
fmt.Fprintf(s.stderr(), "Updating %s after changes were detected\n", strings.Join(services, ", "))
func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) {
return func(services rebuildServices) {
serviceNames := make([]string, 0, len(services))
allPaths := make(utils.Set[string])
for serviceName, paths := range services {
serviceNames = append(serviceNames, serviceName)
for p := range paths {
allPaths.Add(p)
}
}
fmt.Fprintf(
s.stderr(),
"Rebuilding %s after changes were detected:%s\n",
strings.Join(serviceNames, ", "),
strings.Join(append([]string{""}, allPaths.Elements()...), "\n - "),
)
imageIds, err := s.build(ctx, project, api.BuildOptions{
Services: services,
Services: serviceNames,
})
if err != nil {
fmt.Fprintf(s.stderr(), "Build failed\n")
@ -201,11 +238,11 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
err = s.Up(ctx, project, api.UpOptions{
Create: api.CreateOptions{
Services: services,
Services: serviceNames,
Inherit: true,
},
Start: api.StartOptions{
Services: services,
Services: serviceNames,
Project: project,
},
})
@ -215,39 +252,61 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
}
}
func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync chan api.CopyOptions) func() error {
func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error {
return func() error {
for {
select {
case <-ctx.Done():
return nil
case opt := <-needSync:
err := s.Copy(ctx, project.Name, opt)
if err != nil {
return err
if fi, statErr := os.Stat(opt.hostPath); statErr == nil && !fi.IsDir() {
err := s.Copy(ctx, project.Name, api.CopyOptions{
Source: opt.hostPath,
Destination: fmt.Sprintf("%s:%s", opt.service, opt.containerPath),
})
if err != nil {
return err
}
fmt.Fprintf(s.stderr(), "%s updated\n", opt.containerPath)
} else if errors.Is(statErr, fs.ErrNotExist) {
_, err := s.Exec(ctx, project.Name, api.RunOptions{
Service: opt.service,
Command: []string{"rm", "-rf", opt.containerPath},
Index: 1,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", opt.containerPath, opt.service, err)
}
fmt.Fprintf(s.stderr(), "%s deleted from container\n", opt.containerPath)
}
fmt.Fprintf(s.stderr(), "%s updated\n", opt.Destination)
}
}
}
}
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input chan string, fn func(services []string)) {
services := utils.Set[string]{}
type rebuildServices map[string]utils.Set[string]
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) {
services := make(rebuildServices)
t := clock.AfterFunc(delay, func() {
if len(services) > 0 {
refresh := services.Elements()
services.Clear()
fn(refresh)
fn(services)
// TODO(milas): this is a data race!
services = make(rebuildServices)
}
})
for {
select {
case <-ctx.Done():
return
case service := <-input:
case e := <-input:
t.Reset(delay)
services.Add(service)
svc, ok := services[e.service]
if !ok {
svc = make(utils.Set[string])
services[e.service] = svc
}
svc.Add(e.hostPath)
}
}
}

View File

@ -24,24 +24,27 @@ import (
)
func Test_debounce(t *testing.T) {
ch := make(chan string)
ch := make(chan fileMapping)
var (
ran int
got []string
)
clock := clockwork.NewFakeClock()
ctx, stop := context.WithCancel(context.TODO())
ctx, stop := context.WithCancel(context.Background())
t.Cleanup(stop)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
debounce(ctx, clock, quietPeriod, ch, func(services []string) {
got = append(got, services...)
debounce(ctx, clock, quietPeriod, ch, func(services rebuildServices) {
for svc := range services {
got = append(got, svc)
}
ran++
stop()
})
return nil
})
for i := 0; i < 100; i++ {
ch <- "test"
ch <- fileMapping{service: "test"}
}
assert.Equal(t, ran, 0)
clock.Advance(quietPeriod)