diff --git a/internal/sync/tar.go b/internal/sync/tar.go new file mode 100644 index 000000000..3296783ee --- /dev/null +++ b/internal/sync/tar.go @@ -0,0 +1,344 @@ +/* + Copyright 2018 The Tilt Dev Authors + Copyright 2023 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "archive/tar" + "bytes" + "context" + "fmt" + "io" + "io/fs" + "os" + "path" + "path/filepath" + "strings" + + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + + "github.com/compose-spec/compose-go/types" + moby "github.com/docker/docker/api/types" + "github.com/docker/docker/pkg/archive" +) + +type archiveEntry struct { + path string + info os.FileInfo + header *tar.Header +} + +type LowLevelClient interface { + ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) + + Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error +} + +type Tar struct { + client LowLevelClient + + projectName string +} + +var _ Syncer = &Tar{} + +func NewTar(projectName string, client LowLevelClient) *Tar { + return &Tar{ + projectName: projectName, + client: client, + } +} + +func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error { + containers, err := t.client.ContainersForService(ctx, t.projectName, service.Name) + if err != nil { + return err + } + + var pathsToCopy []PathMapping + var pathsToDelete []string + for _, p := range paths { + if _, err := os.Stat(p.HostPath); err != nil && errors.Is(err, fs.ErrNotExist) { + pathsToDelete = append(pathsToDelete, p.ContainerPath) + } else { + pathsToCopy = append(pathsToCopy, p) + } + } + + // TODO: this can't be read from multiple times + tarReader := tarArchive(pathsToCopy) + + var deleteCmd []string + if len(pathsToDelete) != 0 { + deleteCmd = append([]string{"rm", "-rf"}, pathsToDelete...) + } + copyCmd := []string{"tar", "-v", "-C", "/", "-x", "-f", "-"} + + var eg multierror.Group + for i := range containers { + containerID := containers[i].ID + eg.Go(func() error { + if len(deleteCmd) != 0 { + if err := t.client.Exec(ctx, containerID, deleteCmd, nil); err != nil { + return fmt.Errorf("deleting paths in %s: %w", containerID, err) + } + } + if err := t.client.Exec(ctx, containerID, copyCmd, tarReader); err != nil { + return fmt.Errorf("copying files to %s: %w", containerID, err) + } + return nil + }) + } + return eg.Wait().ErrorOrNil() +} + +type ArchiveBuilder struct { + tw *tar.Writer + paths []string // local paths archived + + // A shared I/O buffer to help with file copying. + copyBuf *bytes.Buffer +} + +func NewArchiveBuilder(writer io.Writer) *ArchiveBuilder { + tw := tar.NewWriter(writer) + return &ArchiveBuilder{ + tw: tw, + copyBuf: &bytes.Buffer{}, + } +} + +func (a *ArchiveBuilder) Close() error { + return a.tw.Close() +} + +// ArchivePathsIfExist creates a tar archive of all local files in `paths`. It quietly skips any paths that don't exist. +func (a *ArchiveBuilder) ArchivePathsIfExist(paths []PathMapping) error { + // In order to handle overlapping syncs, we + // 1) collect all the entries, + // 2) de-dupe them, with last-one-wins semantics + // 3) write all the entries + // + // It's not obvious that this is the correct behavior. A better approach + // (that's more in-line with how syncs work) might ignore files in earlier + // path mappings when we know they're going to be "synced" over. + // There's a bunch of subtle product decisions about how overlapping path + // mappings work that we're not sure about. + var entries []archiveEntry + for _, p := range paths { + newEntries, err := a.entriesForPath(p.HostPath, p.ContainerPath) + if err != nil { + return fmt.Errorf("inspecting %q: %w", p.HostPath, err) + } + + entries = append(entries, newEntries...) + } + + entries = dedupeEntries(entries) + for _, entry := range entries { + err := a.writeEntry(entry) + if err != nil { + return fmt.Errorf("archiving %q: %w", entry.path, err) + } + a.paths = append(a.paths, entry.path) + } + return nil +} + +func (a *ArchiveBuilder) writeEntry(entry archiveEntry) error { + pathInTar := entry.path + header := entry.header + + if header.Typeflag != tar.TypeReg { + // anything other than a regular file (e.g. dir, symlink) just needs the header + if err := a.tw.WriteHeader(header); err != nil { + return fmt.Errorf("writing %q header: %w", pathInTar, err) + } + return nil + } + + file, err := os.Open(pathInTar) + if err != nil { + // In case the file has been deleted since we last looked at it. + if os.IsNotExist(err) { + return nil + } + return err + } + + defer func() { + _ = file.Close() + }() + + // The size header must match the number of contents bytes. + // + // There is room for a race condition here if something writes to the file + // after we've read the file size. + // + // For small files, we avoid this by first copying the file into a buffer, + // and using the size of the buffer to populate the header. + // + // For larger files, we don't want to copy the whole thing into a buffer, + // because that would blow up heap size. There is some danger that this + // will lead to a spurious error when the tar writer validates the sizes. + // That error will be disruptive but will be handled as best as we + // can downstream. + useBuf := header.Size < 5000000 + if useBuf { + a.copyBuf.Reset() + _, err = io.Copy(a.copyBuf, file) + if err != nil && err != io.EOF { + return fmt.Errorf("copying %q: %w", pathInTar, err) + } + header.Size = int64(len(a.copyBuf.Bytes())) + } + + // wait to write the header until _after_ the file is successfully opened + // to avoid generating an invalid tar entry that has a header but no contents + // in the case the file has been deleted + err = a.tw.WriteHeader(header) + if err != nil { + return fmt.Errorf("writing %q header: %w", pathInTar, err) + } + + if useBuf { + _, err = io.Copy(a.tw, a.copyBuf) + } else { + _, err = io.Copy(a.tw, file) + } + + if err != nil && err != io.EOF { + return fmt.Errorf("copying %q: %w", pathInTar, err) + } + + // explicitly flush so that if the entry is invalid we will detect it now and + // provide a more meaningful error + if err := a.tw.Flush(); err != nil { + return fmt.Errorf("finalizing %q: %w", pathInTar, err) + } + return nil +} + +// tarPath writes the given source path into tarWriter at the given dest (recursively for directories). +// e.g. tarring my_dir --> dest d: d/file_a, d/file_b +// If source path does not exist, quietly skips it and returns no err +func (a *ArchiveBuilder) entriesForPath(localPath, containerPath string) ([]archiveEntry, error) { + localInfo, err := os.Stat(localPath) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + + localPathIsDir := localInfo.IsDir() + if localPathIsDir { + // Make sure we can trim this off filenames to get valid relative filepaths + if !strings.HasSuffix(localPath, string(filepath.Separator)) { + localPath += string(filepath.Separator) + } + } + + containerPath = strings.TrimPrefix(containerPath, "/") + + result := make([]archiveEntry, 0) + err = filepath.Walk(localPath, func(curLocalPath string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("walking %q: %w", curLocalPath, err) + } + + linkname := "" + if info.Mode()&os.ModeSymlink != 0 { + var err error + linkname, err = os.Readlink(curLocalPath) + if err != nil { + return err + } + } + + var name string + //nolint:gocritic + if localPathIsDir { + // Name of file in tar should be relative to source directory... + tmp, err := filepath.Rel(localPath, curLocalPath) + if err != nil { + return fmt.Errorf("making %q relative to %q: %w", curLocalPath, localPath, err) + } + // ...and live inside `dest` + name = path.Join(containerPath, filepath.ToSlash(tmp)) + } else if strings.HasSuffix(containerPath, "/") { + name = containerPath + filepath.Base(curLocalPath) + } else { + name = containerPath + } + + header, err := archive.FileInfoHeader(name, info, linkname) + if err != nil { + // Not all types of files are allowed in a tarball. That's OK. + // Mimic the Docker behavior and just skip the file. + return nil + } + + result = append(result, archiveEntry{ + path: curLocalPath, + info: info, + header: header, + }) + + return nil + }) + if err != nil { + return nil, err + } + return result, nil +} + +func tarArchive(ops []PathMapping) io.ReadCloser { + pr, pw := io.Pipe() + go func() { + ab := NewArchiveBuilder(pw) + err := ab.ArchivePathsIfExist(ops) + if err != nil { + _ = pw.CloseWithError(fmt.Errorf("adding files to tar: %w", err)) + } else { + // propagate errors from the TarWriter::Close() because it performs a final + // Flush() and any errors mean the tar is invalid + if err := ab.Close(); err != nil { + _ = pw.CloseWithError(fmt.Errorf("closing tar: %w", err)) + } else { + _ = pw.Close() + } + } + }() + return pr +} + +// Dedupe the entries with last-entry-wins semantics. +func dedupeEntries(entries []archiveEntry) []archiveEntry { + seenIndex := make(map[string]int, len(entries)) + result := make([]archiveEntry, 0, len(entries)) + for i, entry := range entries { + seenIndex[entry.header.Name] = i + } + for i, entry := range entries { + if seenIndex[entry.header.Name] == i { + result = append(result, entry) + } + } + return result +} diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index 77a5e13f9..f49bcbfe4 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -17,11 +17,16 @@ package compose import ( "context" "fmt" + "io" + "os" "path" "path/filepath" + "strconv" "strings" "time" + moby "github.com/docker/docker/api/types" + "github.com/docker/compose/v2/internal/sync" "github.com/compose-spec/compose-go/types" @@ -185,11 +190,11 @@ WATCH: hostPath := event.Path() for i, trigger := range triggers { - logrus.Debugf("change detected on %s - comparing with %s", hostPath, trigger.Path) + logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path) if watch.IsChild(trigger.Path, hostPath) { - match, err := ignores[i].Matches(hostPath) if err != nil { + logrus.Warnf("error ignore matching %q: %v", hostPath, err) return err } @@ -198,6 +203,7 @@ WATCH: continue } + logrus.Infof("change for %q", hostPath) fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath) switch trigger.Action { @@ -241,9 +247,18 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) if err != nil { return nil, err } + baseDir, err := filepath.EvalSymlinks(project.WorkingDir) + if err != nil { + return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err) + } + for i, trigger := range config.Watch { if !filepath.IsAbs(trigger.Path) { - trigger.Path = filepath.Join(project.WorkingDir, trigger.Path) + trigger.Path = filepath.Join(baseDir, trigger.Path) + } + if p, err := filepath.EvalSymlinks(trigger.Path); err == nil { + // this might fail because the path doesn't exist, etc. + trigger.Path = p } trigger.Path = filepath.Clean(trigger.Path) if trigger.Path == "" { @@ -301,19 +316,24 @@ func (s *composeService) makeSyncFn( project *types.Project, needSync <-chan sync.PathMapping, ) func() error { - syncer := sync.NewDockerCopy(project.Name, s, s.stdinfo()) + var syncer sync.Syncer + if useTar, _ := strconv.ParseBool(os.Getenv("COMPOSE_EXPERIMENTAL_WATCH_TAR")); useTar { + syncer = sync.NewTar(project.Name, tarDockerClient{s: s}) + } else { + syncer = sync.NewDockerCopy(project.Name, s, s.stdinfo()) + } return func() error { for { select { case <-ctx.Done(): return nil - case pathMapping := <-needSync: - service, err := project.GetService(pathMapping.Service) + case op := <-needSync: + service, err := project.GetService(op.Service) if err != nil { return err } - if err := syncer.Sync(ctx, service, []sync.PathMapping{pathMapping}); err != nil { + if err := syncer.Sync(ctx, service, []sync.PathMapping{op}); err != nil { return err } } @@ -356,3 +376,72 @@ func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolu } return false } + +type tarDockerClient struct { + s *composeService +} + +func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) { + containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName) + if err != nil { + return nil, err + } + return containers, nil +} + +func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error { + execCfg := moby.ExecConfig{ + Cmd: cmd, + AttachStdout: false, + AttachStderr: true, + AttachStdin: in != nil, + Tty: false, + } + execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg) + if err != nil { + return err + } + + startCheck := moby.ExecStartCheck{Tty: false, Detach: false} + conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck) + if err != nil { + return err + } + defer conn.Close() + + var eg errgroup.Group + if in != nil { + eg.Go(func() error { + defer func() { + _ = conn.CloseWrite() + }() + _, err := io.Copy(conn.Conn, in) + return err + }) + } + eg.Go(func() error { + _, err := io.Copy(t.s.stdinfo(), conn.Reader) + return err + }) + + err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck) + if err != nil { + return err + } + + // although the errgroup is not tied directly to the context, the operations + // in it are reading/writing to the connection, which is tied to the context, + // so they won't block indefinitely + if err := eg.Wait(); err != nil { + return err + } + + execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID) + if err != nil { + return err + } + if execResult.ExitCode != 0 { + return fmt.Errorf("exit code %d", execResult.ExitCode) + } + return nil +} diff --git a/pkg/e2e/watch_test.go b/pkg/e2e/watch_test.go index 5c0595eab..38ce73103 100644 --- a/pkg/e2e/watch_test.go +++ b/pkg/e2e/watch_test.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "path/filepath" - "runtime" "strings" "sync/atomic" "testing" @@ -34,21 +33,28 @@ import ( ) func TestWatch(t *testing.T) { - if runtime.GOOS == "darwin" { - t.Skip("Test currently broken on macOS due to symlink issues (see compose-go#436)") - } - services := []string{"alpine", "busybox", "debian"} - for _, svcName := range services { - t.Run(svcName, func(t *testing.T) { - t.Helper() - doTest(t, svcName) - }) - } + t.Run("docker cp", func(t *testing.T) { + for _, svcName := range services { + t.Run(svcName, func(t *testing.T) { + t.Helper() + doTest(t, svcName, false) + }) + } + }) + + t.Run("tar", func(t *testing.T) { + for _, svcName := range services { + t.Run(svcName, func(t *testing.T) { + t.Helper() + doTest(t, svcName, true) + }) + } + }) } // NOTE: these tests all share a single Compose file but are safe to run concurrently -func doTest(t *testing.T, svcName string) { +func doTest(t *testing.T, svcName string, tarSync bool) { tmpdir := t.TempDir() dataDir := filepath.Join(tmpdir, "data") writeDataFile := func(name string, contents string) { @@ -67,6 +73,9 @@ func doTest(t *testing.T, svcName string) { "COMPOSE_FILE=" + composeFilePath, "COMPOSE_PROJECT_NAME=" + projName, } + if tarSync { + env = append(env, "COMPOSE_EXPERIMENTAL_WATCH_TAR=1") + } cli := NewCLI(t, WithEnv(env...)) @@ -93,7 +102,7 @@ func doTest(t *testing.T, svcName string) { var testComplete atomic.Bool go func() { // if the process exits abnormally before the test is done, fail the test - if err := r.Cmd.Wait(); err != nil && !testComplete.Load() { + if err := r.Cmd.Wait(); err != nil && !t.Failed() && !testComplete.Load() { assert.Check(t, cmp.Nil(err)) } }() @@ -136,8 +145,7 @@ func doTest(t *testing.T, svcName string) { Assert(t, icmd.Expected{ ExitCode: 1, Err: "No such file or directory", - }, - ) + }) t.Logf("Writing to ignored paths") writeDataFile("data.foo", "ignored") diff --git a/pkg/watch/watcher_darwin.go b/pkg/watch/watcher_darwin.go index 2a2c1da9d..d1a2a9c20 100644 --- a/pkg/watch/watcher_darwin.go +++ b/pkg/watch/watcher_darwin.go @@ -121,8 +121,8 @@ func newWatcher(paths []string, ignore PathMatcher) (Notify, error) { dw := &fseventNotify{ ignore: ignore, stream: &fsevents.EventStream{ - Latency: 1 * time.Millisecond, - Flags: fsevents.FileEvents, + Latency: 50 * time.Millisecond, + Flags: fsevents.FileEvents | fsevents.IgnoreSelf, // NOTE(dmiller): this corresponds to the `sinceWhen` parameter in FSEventStreamCreate // https://developer.apple.com/documentation/coreservices/1443980-fseventstreamcreate EventID: fsevents.LatestEventID(),