From d2034029986278c7daab7e975d743bdfaf7b7b1e Mon Sep 17 00:00:00 2001
From: Milas Bowman <milas.bowman@docker.com>
Date: Tue, 13 Feb 2024 12:56:53 -0500
Subject: [PATCH] chore(watch): remove old `docker cp` implementation

This has not been the default for quite a while and required
setting an environment variable to revert back.

The tar implementation is more performant and addresses several
edge cases with the original `docker cp` version, so it's time
to fully retire it.

The scaffolding for multiple sync implementations remains to
support future experimentation here.

Signed-off-by: Milas Bowman <milas.bowman@docker.com>
---
 internal/sync/docker_cp.go   | 104 ------------------------
 internal/sync/writer.go      |  91 ---------------------
 internal/sync/writer_test.go | 152 -----------------------------------
 pkg/compose/watch.go         |  21 +++--
 pkg/e2e/watch_test.go        |  33 +++-----
 5 files changed, 22 insertions(+), 379 deletions(-)
 delete mode 100644 internal/sync/docker_cp.go
 delete mode 100644 internal/sync/writer.go
 delete mode 100644 internal/sync/writer_test.go

diff --git a/internal/sync/docker_cp.go b/internal/sync/docker_cp.go
deleted file mode 100644
index 47077b404..000000000
--- a/internal/sync/docker_cp.go
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
-   Copyright 2023 Docker Compose CLI authors
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-       http://www.apache.org/licenses/LICENSE-2.0
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package sync
-
-import (
-	"context"
-	"errors"
-	"fmt"
-	"io"
-	"io/fs"
-	"os"
-
-	"github.com/compose-spec/compose-go/v2/types"
-	"github.com/docker/compose/v2/pkg/api"
-	"github.com/sirupsen/logrus"
-)
-
-type ComposeClient interface {
-	Exec(ctx context.Context, projectName string, options api.RunOptions) (int, error)
-
-	Copy(ctx context.Context, projectName string, options api.CopyOptions) error
-}
-
-type DockerCopy struct {
-	client ComposeClient
-
-	projectName string
-
-	infoWriter io.Writer
-}
-
-var _ Syncer = &DockerCopy{}
-
-func NewDockerCopy(projectName string, client ComposeClient, infoWriter io.Writer) *DockerCopy {
-	return &DockerCopy{
-		projectName: projectName,
-		client:      client,
-		infoWriter:  infoWriter,
-	}
-}
-
-func (d *DockerCopy) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
-	var errs []error
-	for i := range paths {
-		if err := d.sync(ctx, service, paths[i]); err != nil {
-			errs = append(errs, err)
-		}
-	}
-	return errors.Join(errs...)
-}
-
-func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, pathMapping PathMapping) error {
-	scale := service.GetScale()
-
-	if fi, statErr := os.Stat(pathMapping.HostPath); statErr == nil {
-		if fi.IsDir() {
-			for i := 1; i <= scale; i++ {
-				_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
-					Service: service.Name,
-					Command: []string{"mkdir", "-p", pathMapping.ContainerPath},
-					Index:   i,
-				})
-				if err != nil {
-					logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, service.Name, err)
-				}
-			}
-			fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath)
-		} else {
-			err := d.client.Copy(ctx, d.projectName, api.CopyOptions{
-				Source:      pathMapping.HostPath,
-				Destination: fmt.Sprintf("%s:%s", service.Name, pathMapping.ContainerPath),
-			})
-			if err != nil {
-				return err
-			}
-			fmt.Fprintf(d.infoWriter, "%s updated\n", pathMapping.ContainerPath)
-		}
-	} else if errors.Is(statErr, fs.ErrNotExist) {
-		for i := 1; i <= scale; i++ {
-			_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
-				Service: service.Name,
-				Command: []string{"rm", "-rf", pathMapping.ContainerPath},
-				Index:   i,
-			})
-			if err != nil {
-				logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, service.Name, err)
-			}
-		}
-		fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath)
-	}
-	return nil
-}
diff --git a/internal/sync/writer.go b/internal/sync/writer.go
deleted file mode 100644
index f5c182d1b..000000000
--- a/internal/sync/writer.go
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-   Copyright 2023 Docker Compose CLI authors
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package sync
-
-import (
-	"errors"
-	"io"
-)
-
-// lossyMultiWriter attempts to tee all writes to the provided io.PipeWriter
-// instances.
-//
-// If a writer fails during a Write call, the write-side of the pipe is then
-// closed with the error and no subsequent attempts are made to write to the
-// pipe.
-//
-// If all writers fail during a write, an error is returned.
-//
-// On Close, any remaining writers are closed.
-type lossyMultiWriter struct {
-	writers []*io.PipeWriter
-}
-
-// newLossyMultiWriter creates a new writer that *attempts* to tee all data written to it to the provided io.PipeWriter
-// instances. Rather than failing a write operation if any writer fails, writes only fail if there are no more valid
-// writers. Otherwise, errors for specific writers are propagated via CloseWithError.
-func newLossyMultiWriter(writers ...*io.PipeWriter) *lossyMultiWriter {
-	// reverse the writers because during the write we iterate
-	// backwards, so this way we'll end up writing in the same
-	// order as the writers were passed to us
-	writers = append([]*io.PipeWriter(nil), writers...)
-	for i, j := 0, len(writers)-1; i < j; i, j = i+1, j-1 {
-		writers[i], writers[j] = writers[j], writers[i]
-	}
-
-	return &lossyMultiWriter{
-		writers: writers,
-	}
-}
-
-// Write writes to each writer that is still active (i.e. has not failed/encountered an error on write).
-//
-// If a writer encounters an error during the write, the write side of the pipe is closed with the error
-// and no subsequent attempts will be made to write to that writer.
-//
-// An error is only returned from this function if ALL writers have failed.
-func (l *lossyMultiWriter) Write(p []byte) (int, error) {
-	// NOTE: this function iterates backwards so that it can
-	// 	safely remove elements during the loop
-	for i := len(l.writers) - 1; i >= 0; i-- {
-		written, err := l.writers[i].Write(p)
-		if err == nil && written != len(p) {
-			err = io.ErrShortWrite
-		}
-		if err != nil {
-			// pipe writer close cannot fail
-			_ = l.writers[i].CloseWithError(err)
-			l.writers = append(l.writers[:i], l.writers[i+1:]...)
-		}
-	}
-
-	if len(l.writers) == 0 {
-		return 0, errors.New("no writers remaining")
-	}
-
-	return len(p), nil
-}
-
-// Close closes any still open (non-failed) writers.
-//
-// Failed writers have already been closed with an error.
-func (l *lossyMultiWriter) Close() {
-	for i := range l.writers {
-		// pipe writer close cannot fail
-		_ = l.writers[i].Close()
-	}
-}
diff --git a/internal/sync/writer_test.go b/internal/sync/writer_test.go
deleted file mode 100644
index b6de694c7..000000000
--- a/internal/sync/writer_test.go
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
-   Copyright 2023 Docker Compose CLI authors
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package sync
-
-import (
-	"context"
-	"errors"
-	"io"
-	"sync"
-	"testing"
-	"time"
-
-	"github.com/stretchr/testify/require"
-)
-
-func TestLossyMultiWriter(t *testing.T) {
-	ctx, cancel := context.WithCancel(context.Background())
-	t.Cleanup(cancel)
-
-	const count = 5
-	readers := make([]*bufReader, count)
-	writers := make([]*io.PipeWriter, count)
-	for i := 0; i < count; i++ {
-		r, w := io.Pipe()
-		readers[i] = newBufReader(ctx, r)
-		writers[i] = w
-	}
-
-	w := newLossyMultiWriter(writers...)
-	t.Cleanup(w.Close)
-	n, err := w.Write([]byte("hello world"))
-	require.Equal(t, 11, n)
-	require.NoError(t, err)
-	for i := range readers {
-		readers[i].waitForWrite(t)
-		require.Equal(t, "hello world", string(readers[i].contents()))
-		readers[i].reset()
-	}
-
-	// even if a writer fails (in this case simulated by closing the receiving end of the pipe),
-	// write operations should continue to return nil error but the writer should be closed
-	// with an error
-	const failIndex = 3
-	require.NoError(t, readers[failIndex].r.CloseWithError(errors.New("oh no")))
-	n, err = w.Write([]byte("hello"))
-	require.Equal(t, 5, n)
-	require.NoError(t, err)
-	for i := range readers {
-		readers[i].waitForWrite(t)
-		if i == failIndex {
-			err := readers[i].error()
-			require.EqualError(t, err, "io: read/write on closed pipe")
-			require.Empty(t, readers[i].contents())
-		} else {
-			require.Equal(t, "hello", string(readers[i].contents()))
-		}
-	}
-
-	// perform another write, verify there's still no errors
-	n, err = w.Write([]byte(" world"))
-	require.Equal(t, 6, n)
-	require.NoError(t, err)
-}
-
-type bufReader struct {
-	ctx       context.Context
-	r         *io.PipeReader
-	mu        sync.Mutex
-	err       error
-	data      []byte
-	writeSync chan struct{}
-}
-
-func newBufReader(ctx context.Context, r *io.PipeReader) *bufReader {
-	b := &bufReader{
-		ctx:       ctx,
-		r:         r,
-		writeSync: make(chan struct{}),
-	}
-	go b.consume()
-	return b
-}
-
-func (b *bufReader) waitForWrite(t testing.TB) {
-	t.Helper()
-	select {
-	case <-b.writeSync:
-		return
-	case <-time.After(50 * time.Millisecond):
-		t.Fatal("timed out waiting for write")
-	}
-}
-
-func (b *bufReader) consume() {
-	defer close(b.writeSync)
-	for {
-		buf := make([]byte, 512)
-		n, err := b.r.Read(buf)
-		if n != 0 {
-			b.mu.Lock()
-			b.data = append(b.data, buf[:n]...)
-			b.mu.Unlock()
-		}
-		if errors.Is(err, io.EOF) {
-			return
-		}
-		if err != nil {
-			b.mu.Lock()
-			b.err = err
-			b.mu.Unlock()
-			return
-		}
-		// prevent goroutine leak, tie lifetime to the test
-		select {
-		case b.writeSync <- struct{}{}:
-		case <-b.ctx.Done():
-			return
-		}
-	}
-}
-
-func (b *bufReader) contents() []byte {
-	b.mu.Lock()
-	defer b.mu.Unlock()
-	return b.data
-}
-
-func (b *bufReader) reset() {
-	b.mu.Lock()
-	defer b.mu.Unlock()
-	b.data = nil
-}
-
-func (b *bufReader) error() error {
-	b.mu.Lock()
-	defer b.mu.Unlock()
-	return b.err
-}
diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go
index 3673c4355..2e783026e 100644
--- a/pkg/compose/watch.go
+++ b/pkg/compose/watch.go
@@ -46,21 +46,23 @@ type fileEvent struct {
 	Action types.WatchAction
 }
 
-// getSyncImplementation returns the the tar-based syncer unless it has been explicitly
-// disabled with `COMPOSE_EXPERIMENTAL_WATCH_TAR=0`. Note that the absence of the env
-// var means enabled.
-func (s *composeService) getSyncImplementation(project *types.Project) sync.Syncer {
+// getSyncImplementation returns an appropriate sync implementation for the
+// project.
+//
+// Currently, an implementation that batches files and transfers them using
+// the Moby `Untar` API.
+func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syncer, error) {
 	var useTar bool
 	if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
 		useTar, _ = strconv.ParseBool(useTarEnv)
 	} else {
 		useTar = true
 	}
-	if useTar {
-		return sync.NewTar(project.Name, tarDockerClient{s: s})
+	if !useTar {
+		return nil, errors.New("no available sync implementation")
 	}
 
-	return sync.NewDockerCopy(project.Name, s, s.stdinfo())
+	return sync.NewTar(project.Name, tarDockerClient{s: s}), nil
 }
 
 func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
@@ -68,7 +70,10 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
 	if project, err = project.WithSelectedServices(services); err != nil {
 		return err
 	}
-	syncer := s.getSyncImplementation(project)
+	syncer, err := s.getSyncImplementation(project)
+	if err != nil {
+		return err
+	}
 	eg, ctx := errgroup.WithContext(ctx)
 	watching := false
 	for i := range project.Services {
diff --git a/pkg/e2e/watch_test.go b/pkg/e2e/watch_test.go
index 0740e98c2..677b080a0 100644
--- a/pkg/e2e/watch_test.go
+++ b/pkg/e2e/watch_test.go
@@ -21,7 +21,6 @@ import (
 	"fmt"
 	"os"
 	"path/filepath"
-	"strconv"
 	"strings"
 	"sync/atomic"
 	"testing"
@@ -38,23 +37,12 @@ func TestWatch(t *testing.T) {
 	t.Skip("Skipping watch tests until we can figure out why they are flaky/failing")
 
 	services := []string{"alpine", "busybox", "debian"}
-	t.Run("docker cp", func(t *testing.T) {
-		for _, svcName := range services {
-			t.Run(svcName, func(t *testing.T) {
-				t.Helper()
-				doTest(t, svcName, false)
-			})
-		}
-	})
-
-	t.Run("tar", func(t *testing.T) {
-		for _, svcName := range services {
-			t.Run(svcName, func(t *testing.T) {
-				t.Helper()
-				doTest(t, svcName, true)
-			})
-		}
-	})
+	for _, svcName := range services {
+		t.Run(svcName, func(t *testing.T) {
+			t.Helper()
+			doTest(t, svcName)
+		})
+	}
 }
 
 func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
@@ -150,8 +138,9 @@ func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
 
 }
 
-// NOTE: these tests all share a single Compose file but are safe to run concurrently
-func doTest(t *testing.T, svcName string, tarSync bool) {
+// NOTE: these tests all share a single Compose file but are safe to run
+// concurrently (though that's not recommended).
+func doTest(t *testing.T, svcName string) {
 	tmpdir := t.TempDir()
 	dataDir := filepath.Join(tmpdir, "data")
 	configDir := filepath.Join(tmpdir, "config")
@@ -171,13 +160,9 @@ func doTest(t *testing.T, svcName string, tarSync bool) {
 	CopyFile(t, filepath.Join("fixtures", "watch", "compose.yaml"), composeFilePath)
 
 	projName := "e2e-watch-" + svcName
-	if tarSync {
-		projName += "-tar"
-	}
 	env := []string{
 		"COMPOSE_FILE=" + composeFilePath,
 		"COMPOSE_PROJECT_NAME=" + projName,
-		"COMPOSE_EXPERIMENTAL_WATCH_TAR=" + strconv.FormatBool(tarSync),
 	}
 
 	cli := NewCLI(t, WithEnv(env...))