diff --git a/internal/sync/docker_cp.go b/internal/sync/docker_cp.go deleted file mode 100644 index 47077b404..000000000 --- a/internal/sync/docker_cp.go +++ /dev/null @@ -1,104 +0,0 @@ -/* - Copyright 2023 Docker Compose CLI authors - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package sync - -import ( - "context" - "errors" - "fmt" - "io" - "io/fs" - "os" - - "github.com/compose-spec/compose-go/v2/types" - "github.com/docker/compose/v2/pkg/api" - "github.com/sirupsen/logrus" -) - -type ComposeClient interface { - Exec(ctx context.Context, projectName string, options api.RunOptions) (int, error) - - Copy(ctx context.Context, projectName string, options api.CopyOptions) error -} - -type DockerCopy struct { - client ComposeClient - - projectName string - - infoWriter io.Writer -} - -var _ Syncer = &DockerCopy{} - -func NewDockerCopy(projectName string, client ComposeClient, infoWriter io.Writer) *DockerCopy { - return &DockerCopy{ - projectName: projectName, - client: client, - infoWriter: infoWriter, - } -} - -func (d *DockerCopy) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error { - var errs []error - for i := range paths { - if err := d.sync(ctx, service, paths[i]); err != nil { - errs = append(errs, err) - } - } - return errors.Join(errs...) -} - -func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, pathMapping PathMapping) error { - scale := service.GetScale() - - if fi, statErr := os.Stat(pathMapping.HostPath); statErr == nil { - if fi.IsDir() { - for i := 1; i <= scale; i++ { - _, err := d.client.Exec(ctx, d.projectName, api.RunOptions{ - Service: service.Name, - Command: []string{"mkdir", "-p", pathMapping.ContainerPath}, - Index: i, - }) - if err != nil { - logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, service.Name, err) - } - } - fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath) - } else { - err := d.client.Copy(ctx, d.projectName, api.CopyOptions{ - Source: pathMapping.HostPath, - Destination: fmt.Sprintf("%s:%s", service.Name, pathMapping.ContainerPath), - }) - if err != nil { - return err - } - fmt.Fprintf(d.infoWriter, "%s updated\n", pathMapping.ContainerPath) - } - } else if errors.Is(statErr, fs.ErrNotExist) { - for i := 1; i <= scale; i++ { - _, err := d.client.Exec(ctx, d.projectName, api.RunOptions{ - Service: service.Name, - Command: []string{"rm", "-rf", pathMapping.ContainerPath}, - Index: i, - }) - if err != nil { - logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, service.Name, err) - } - } - fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath) - } - return nil -} diff --git a/internal/sync/writer.go b/internal/sync/writer.go deleted file mode 100644 index f5c182d1b..000000000 --- a/internal/sync/writer.go +++ /dev/null @@ -1,91 +0,0 @@ -/* - Copyright 2023 Docker Compose CLI authors - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package sync - -import ( - "errors" - "io" -) - -// lossyMultiWriter attempts to tee all writes to the provided io.PipeWriter -// instances. -// -// If a writer fails during a Write call, the write-side of the pipe is then -// closed with the error and no subsequent attempts are made to write to the -// pipe. -// -// If all writers fail during a write, an error is returned. -// -// On Close, any remaining writers are closed. -type lossyMultiWriter struct { - writers []*io.PipeWriter -} - -// newLossyMultiWriter creates a new writer that *attempts* to tee all data written to it to the provided io.PipeWriter -// instances. Rather than failing a write operation if any writer fails, writes only fail if there are no more valid -// writers. Otherwise, errors for specific writers are propagated via CloseWithError. -func newLossyMultiWriter(writers ...*io.PipeWriter) *lossyMultiWriter { - // reverse the writers because during the write we iterate - // backwards, so this way we'll end up writing in the same - // order as the writers were passed to us - writers = append([]*io.PipeWriter(nil), writers...) - for i, j := 0, len(writers)-1; i < j; i, j = i+1, j-1 { - writers[i], writers[j] = writers[j], writers[i] - } - - return &lossyMultiWriter{ - writers: writers, - } -} - -// Write writes to each writer that is still active (i.e. has not failed/encountered an error on write). -// -// If a writer encounters an error during the write, the write side of the pipe is closed with the error -// and no subsequent attempts will be made to write to that writer. -// -// An error is only returned from this function if ALL writers have failed. -func (l *lossyMultiWriter) Write(p []byte) (int, error) { - // NOTE: this function iterates backwards so that it can - // safely remove elements during the loop - for i := len(l.writers) - 1; i >= 0; i-- { - written, err := l.writers[i].Write(p) - if err == nil && written != len(p) { - err = io.ErrShortWrite - } - if err != nil { - // pipe writer close cannot fail - _ = l.writers[i].CloseWithError(err) - l.writers = append(l.writers[:i], l.writers[i+1:]...) - } - } - - if len(l.writers) == 0 { - return 0, errors.New("no writers remaining") - } - - return len(p), nil -} - -// Close closes any still open (non-failed) writers. -// -// Failed writers have already been closed with an error. -func (l *lossyMultiWriter) Close() { - for i := range l.writers { - // pipe writer close cannot fail - _ = l.writers[i].Close() - } -} diff --git a/internal/sync/writer_test.go b/internal/sync/writer_test.go deleted file mode 100644 index b6de694c7..000000000 --- a/internal/sync/writer_test.go +++ /dev/null @@ -1,152 +0,0 @@ -/* - Copyright 2023 Docker Compose CLI authors - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package sync - -import ( - "context" - "errors" - "io" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestLossyMultiWriter(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - const count = 5 - readers := make([]*bufReader, count) - writers := make([]*io.PipeWriter, count) - for i := 0; i < count; i++ { - r, w := io.Pipe() - readers[i] = newBufReader(ctx, r) - writers[i] = w - } - - w := newLossyMultiWriter(writers...) - t.Cleanup(w.Close) - n, err := w.Write([]byte("hello world")) - require.Equal(t, 11, n) - require.NoError(t, err) - for i := range readers { - readers[i].waitForWrite(t) - require.Equal(t, "hello world", string(readers[i].contents())) - readers[i].reset() - } - - // even if a writer fails (in this case simulated by closing the receiving end of the pipe), - // write operations should continue to return nil error but the writer should be closed - // with an error - const failIndex = 3 - require.NoError(t, readers[failIndex].r.CloseWithError(errors.New("oh no"))) - n, err = w.Write([]byte("hello")) - require.Equal(t, 5, n) - require.NoError(t, err) - for i := range readers { - readers[i].waitForWrite(t) - if i == failIndex { - err := readers[i].error() - require.EqualError(t, err, "io: read/write on closed pipe") - require.Empty(t, readers[i].contents()) - } else { - require.Equal(t, "hello", string(readers[i].contents())) - } - } - - // perform another write, verify there's still no errors - n, err = w.Write([]byte(" world")) - require.Equal(t, 6, n) - require.NoError(t, err) -} - -type bufReader struct { - ctx context.Context - r *io.PipeReader - mu sync.Mutex - err error - data []byte - writeSync chan struct{} -} - -func newBufReader(ctx context.Context, r *io.PipeReader) *bufReader { - b := &bufReader{ - ctx: ctx, - r: r, - writeSync: make(chan struct{}), - } - go b.consume() - return b -} - -func (b *bufReader) waitForWrite(t testing.TB) { - t.Helper() - select { - case <-b.writeSync: - return - case <-time.After(50 * time.Millisecond): - t.Fatal("timed out waiting for write") - } -} - -func (b *bufReader) consume() { - defer close(b.writeSync) - for { - buf := make([]byte, 512) - n, err := b.r.Read(buf) - if n != 0 { - b.mu.Lock() - b.data = append(b.data, buf[:n]...) - b.mu.Unlock() - } - if errors.Is(err, io.EOF) { - return - } - if err != nil { - b.mu.Lock() - b.err = err - b.mu.Unlock() - return - } - // prevent goroutine leak, tie lifetime to the test - select { - case b.writeSync <- struct{}{}: - case <-b.ctx.Done(): - return - } - } -} - -func (b *bufReader) contents() []byte { - b.mu.Lock() - defer b.mu.Unlock() - return b.data -} - -func (b *bufReader) reset() { - b.mu.Lock() - defer b.mu.Unlock() - b.data = nil -} - -func (b *bufReader) error() error { - b.mu.Lock() - defer b.mu.Unlock() - return b.err -} diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index 3673c4355..2e783026e 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -46,21 +46,23 @@ type fileEvent struct { Action types.WatchAction } -// getSyncImplementation returns the the tar-based syncer unless it has been explicitly -// disabled with `COMPOSE_EXPERIMENTAL_WATCH_TAR=0`. Note that the absence of the env -// var means enabled. -func (s *composeService) getSyncImplementation(project *types.Project) sync.Syncer { +// getSyncImplementation returns an appropriate sync implementation for the +// project. +// +// Currently, an implementation that batches files and transfers them using +// the Moby `Untar` API. +func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syncer, error) { var useTar bool if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok { useTar, _ = strconv.ParseBool(useTarEnv) } else { useTar = true } - if useTar { - return sync.NewTar(project.Name, tarDockerClient{s: s}) + if !useTar { + return nil, errors.New("no available sync implementation") } - return sync.NewDockerCopy(project.Name, s, s.stdinfo()) + return sync.NewTar(project.Name, tarDockerClient{s: s}), nil } func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo @@ -68,7 +70,10 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv if project, err = project.WithSelectedServices(services); err != nil { return err } - syncer := s.getSyncImplementation(project) + syncer, err := s.getSyncImplementation(project) + if err != nil { + return err + } eg, ctx := errgroup.WithContext(ctx) watching := false for i := range project.Services { diff --git a/pkg/e2e/watch_test.go b/pkg/e2e/watch_test.go index 0740e98c2..677b080a0 100644 --- a/pkg/e2e/watch_test.go +++ b/pkg/e2e/watch_test.go @@ -21,7 +21,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "strings" "sync/atomic" "testing" @@ -38,23 +37,12 @@ func TestWatch(t *testing.T) { t.Skip("Skipping watch tests until we can figure out why they are flaky/failing") services := []string{"alpine", "busybox", "debian"} - t.Run("docker cp", func(t *testing.T) { - for _, svcName := range services { - t.Run(svcName, func(t *testing.T) { - t.Helper() - doTest(t, svcName, false) - }) - } - }) - - t.Run("tar", func(t *testing.T) { - for _, svcName := range services { - t.Run(svcName, func(t *testing.T) { - t.Helper() - doTest(t, svcName, true) - }) - } - }) + for _, svcName := range services { + t.Run(svcName, func(t *testing.T) { + t.Helper() + doTest(t, svcName) + }) + } } func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) { @@ -150,8 +138,9 @@ func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) { } -// NOTE: these tests all share a single Compose file but are safe to run concurrently -func doTest(t *testing.T, svcName string, tarSync bool) { +// NOTE: these tests all share a single Compose file but are safe to run +// concurrently (though that's not recommended). +func doTest(t *testing.T, svcName string) { tmpdir := t.TempDir() dataDir := filepath.Join(tmpdir, "data") configDir := filepath.Join(tmpdir, "config") @@ -171,13 +160,9 @@ func doTest(t *testing.T, svcName string, tarSync bool) { CopyFile(t, filepath.Join("fixtures", "watch", "compose.yaml"), composeFilePath) projName := "e2e-watch-" + svcName - if tarSync { - projName += "-tar" - } env := []string{ "COMPOSE_FILE=" + composeFilePath, "COMPOSE_PROJECT_NAME=" + projName, - "COMPOSE_EXPERIMENTAL_WATCH_TAR=" + strconv.FormatBool(tarSync), } cli := NewCLI(t, WithEnv(env...))