mirror of https://github.com/docker/compose.git
chore(watch): remove old `docker cp` implementation
This has not been the default for quite a while and required setting an environment variable to revert back. The tar implementation is more performant and addresses several edge cases with the original `docker cp` version, so it's time to fully retire it. The scaffolding for multiple sync implementations remains to support future experimentation here. Signed-off-by: Milas Bowman <milas.bowman@docker.com>
This commit is contained in:
parent
894ab41c3b
commit
d203402998
|
@ -1,104 +0,0 @@
|
||||||
/*
|
|
||||||
Copyright 2023 Docker Compose CLI authors
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package sync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"io/fs"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/compose-spec/compose-go/v2/types"
|
|
||||||
"github.com/docker/compose/v2/pkg/api"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ComposeClient interface {
|
|
||||||
Exec(ctx context.Context, projectName string, options api.RunOptions) (int, error)
|
|
||||||
|
|
||||||
Copy(ctx context.Context, projectName string, options api.CopyOptions) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type DockerCopy struct {
|
|
||||||
client ComposeClient
|
|
||||||
|
|
||||||
projectName string
|
|
||||||
|
|
||||||
infoWriter io.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ Syncer = &DockerCopy{}
|
|
||||||
|
|
||||||
func NewDockerCopy(projectName string, client ComposeClient, infoWriter io.Writer) *DockerCopy {
|
|
||||||
return &DockerCopy{
|
|
||||||
projectName: projectName,
|
|
||||||
client: client,
|
|
||||||
infoWriter: infoWriter,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DockerCopy) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
|
|
||||||
var errs []error
|
|
||||||
for i := range paths {
|
|
||||||
if err := d.sync(ctx, service, paths[i]); err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return errors.Join(errs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, pathMapping PathMapping) error {
|
|
||||||
scale := service.GetScale()
|
|
||||||
|
|
||||||
if fi, statErr := os.Stat(pathMapping.HostPath); statErr == nil {
|
|
||||||
if fi.IsDir() {
|
|
||||||
for i := 1; i <= scale; i++ {
|
|
||||||
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
|
|
||||||
Service: service.Name,
|
|
||||||
Command: []string{"mkdir", "-p", pathMapping.ContainerPath},
|
|
||||||
Index: i,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, service.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath)
|
|
||||||
} else {
|
|
||||||
err := d.client.Copy(ctx, d.projectName, api.CopyOptions{
|
|
||||||
Source: pathMapping.HostPath,
|
|
||||||
Destination: fmt.Sprintf("%s:%s", service.Name, pathMapping.ContainerPath),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fmt.Fprintf(d.infoWriter, "%s updated\n", pathMapping.ContainerPath)
|
|
||||||
}
|
|
||||||
} else if errors.Is(statErr, fs.ErrNotExist) {
|
|
||||||
for i := 1; i <= scale; i++ {
|
|
||||||
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
|
|
||||||
Service: service.Name,
|
|
||||||
Command: []string{"rm", "-rf", pathMapping.ContainerPath},
|
|
||||||
Index: i,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, service.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,91 +0,0 @@
|
||||||
/*
|
|
||||||
Copyright 2023 Docker Compose CLI authors
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package sync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
)
|
|
||||||
|
|
||||||
// lossyMultiWriter attempts to tee all writes to the provided io.PipeWriter
|
|
||||||
// instances.
|
|
||||||
//
|
|
||||||
// If a writer fails during a Write call, the write-side of the pipe is then
|
|
||||||
// closed with the error and no subsequent attempts are made to write to the
|
|
||||||
// pipe.
|
|
||||||
//
|
|
||||||
// If all writers fail during a write, an error is returned.
|
|
||||||
//
|
|
||||||
// On Close, any remaining writers are closed.
|
|
||||||
type lossyMultiWriter struct {
|
|
||||||
writers []*io.PipeWriter
|
|
||||||
}
|
|
||||||
|
|
||||||
// newLossyMultiWriter creates a new writer that *attempts* to tee all data written to it to the provided io.PipeWriter
|
|
||||||
// instances. Rather than failing a write operation if any writer fails, writes only fail if there are no more valid
|
|
||||||
// writers. Otherwise, errors for specific writers are propagated via CloseWithError.
|
|
||||||
func newLossyMultiWriter(writers ...*io.PipeWriter) *lossyMultiWriter {
|
|
||||||
// reverse the writers because during the write we iterate
|
|
||||||
// backwards, so this way we'll end up writing in the same
|
|
||||||
// order as the writers were passed to us
|
|
||||||
writers = append([]*io.PipeWriter(nil), writers...)
|
|
||||||
for i, j := 0, len(writers)-1; i < j; i, j = i+1, j-1 {
|
|
||||||
writers[i], writers[j] = writers[j], writers[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
return &lossyMultiWriter{
|
|
||||||
writers: writers,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write writes to each writer that is still active (i.e. has not failed/encountered an error on write).
|
|
||||||
//
|
|
||||||
// If a writer encounters an error during the write, the write side of the pipe is closed with the error
|
|
||||||
// and no subsequent attempts will be made to write to that writer.
|
|
||||||
//
|
|
||||||
// An error is only returned from this function if ALL writers have failed.
|
|
||||||
func (l *lossyMultiWriter) Write(p []byte) (int, error) {
|
|
||||||
// NOTE: this function iterates backwards so that it can
|
|
||||||
// safely remove elements during the loop
|
|
||||||
for i := len(l.writers) - 1; i >= 0; i-- {
|
|
||||||
written, err := l.writers[i].Write(p)
|
|
||||||
if err == nil && written != len(p) {
|
|
||||||
err = io.ErrShortWrite
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
// pipe writer close cannot fail
|
|
||||||
_ = l.writers[i].CloseWithError(err)
|
|
||||||
l.writers = append(l.writers[:i], l.writers[i+1:]...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(l.writers) == 0 {
|
|
||||||
return 0, errors.New("no writers remaining")
|
|
||||||
}
|
|
||||||
|
|
||||||
return len(p), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes any still open (non-failed) writers.
|
|
||||||
//
|
|
||||||
// Failed writers have already been closed with an error.
|
|
||||||
func (l *lossyMultiWriter) Close() {
|
|
||||||
for i := range l.writers {
|
|
||||||
// pipe writer close cannot fail
|
|
||||||
_ = l.writers[i].Close()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,152 +0,0 @@
|
||||||
/*
|
|
||||||
Copyright 2023 Docker Compose CLI authors
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package sync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestLossyMultiWriter(t *testing.T) {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
t.Cleanup(cancel)
|
|
||||||
|
|
||||||
const count = 5
|
|
||||||
readers := make([]*bufReader, count)
|
|
||||||
writers := make([]*io.PipeWriter, count)
|
|
||||||
for i := 0; i < count; i++ {
|
|
||||||
r, w := io.Pipe()
|
|
||||||
readers[i] = newBufReader(ctx, r)
|
|
||||||
writers[i] = w
|
|
||||||
}
|
|
||||||
|
|
||||||
w := newLossyMultiWriter(writers...)
|
|
||||||
t.Cleanup(w.Close)
|
|
||||||
n, err := w.Write([]byte("hello world"))
|
|
||||||
require.Equal(t, 11, n)
|
|
||||||
require.NoError(t, err)
|
|
||||||
for i := range readers {
|
|
||||||
readers[i].waitForWrite(t)
|
|
||||||
require.Equal(t, "hello world", string(readers[i].contents()))
|
|
||||||
readers[i].reset()
|
|
||||||
}
|
|
||||||
|
|
||||||
// even if a writer fails (in this case simulated by closing the receiving end of the pipe),
|
|
||||||
// write operations should continue to return nil error but the writer should be closed
|
|
||||||
// with an error
|
|
||||||
const failIndex = 3
|
|
||||||
require.NoError(t, readers[failIndex].r.CloseWithError(errors.New("oh no")))
|
|
||||||
n, err = w.Write([]byte("hello"))
|
|
||||||
require.Equal(t, 5, n)
|
|
||||||
require.NoError(t, err)
|
|
||||||
for i := range readers {
|
|
||||||
readers[i].waitForWrite(t)
|
|
||||||
if i == failIndex {
|
|
||||||
err := readers[i].error()
|
|
||||||
require.EqualError(t, err, "io: read/write on closed pipe")
|
|
||||||
require.Empty(t, readers[i].contents())
|
|
||||||
} else {
|
|
||||||
require.Equal(t, "hello", string(readers[i].contents()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// perform another write, verify there's still no errors
|
|
||||||
n, err = w.Write([]byte(" world"))
|
|
||||||
require.Equal(t, 6, n)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
type bufReader struct {
|
|
||||||
ctx context.Context
|
|
||||||
r *io.PipeReader
|
|
||||||
mu sync.Mutex
|
|
||||||
err error
|
|
||||||
data []byte
|
|
||||||
writeSync chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBufReader(ctx context.Context, r *io.PipeReader) *bufReader {
|
|
||||||
b := &bufReader{
|
|
||||||
ctx: ctx,
|
|
||||||
r: r,
|
|
||||||
writeSync: make(chan struct{}),
|
|
||||||
}
|
|
||||||
go b.consume()
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bufReader) waitForWrite(t testing.TB) {
|
|
||||||
t.Helper()
|
|
||||||
select {
|
|
||||||
case <-b.writeSync:
|
|
||||||
return
|
|
||||||
case <-time.After(50 * time.Millisecond):
|
|
||||||
t.Fatal("timed out waiting for write")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bufReader) consume() {
|
|
||||||
defer close(b.writeSync)
|
|
||||||
for {
|
|
||||||
buf := make([]byte, 512)
|
|
||||||
n, err := b.r.Read(buf)
|
|
||||||
if n != 0 {
|
|
||||||
b.mu.Lock()
|
|
||||||
b.data = append(b.data, buf[:n]...)
|
|
||||||
b.mu.Unlock()
|
|
||||||
}
|
|
||||||
if errors.Is(err, io.EOF) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
b.mu.Lock()
|
|
||||||
b.err = err
|
|
||||||
b.mu.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// prevent goroutine leak, tie lifetime to the test
|
|
||||||
select {
|
|
||||||
case b.writeSync <- struct{}{}:
|
|
||||||
case <-b.ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bufReader) contents() []byte {
|
|
||||||
b.mu.Lock()
|
|
||||||
defer b.mu.Unlock()
|
|
||||||
return b.data
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bufReader) reset() {
|
|
||||||
b.mu.Lock()
|
|
||||||
defer b.mu.Unlock()
|
|
||||||
b.data = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bufReader) error() error {
|
|
||||||
b.mu.Lock()
|
|
||||||
defer b.mu.Unlock()
|
|
||||||
return b.err
|
|
||||||
}
|
|
|
@ -46,21 +46,23 @@ type fileEvent struct {
|
||||||
Action types.WatchAction
|
Action types.WatchAction
|
||||||
}
|
}
|
||||||
|
|
||||||
// getSyncImplementation returns the the tar-based syncer unless it has been explicitly
|
// getSyncImplementation returns an appropriate sync implementation for the
|
||||||
// disabled with `COMPOSE_EXPERIMENTAL_WATCH_TAR=0`. Note that the absence of the env
|
// project.
|
||||||
// var means enabled.
|
//
|
||||||
func (s *composeService) getSyncImplementation(project *types.Project) sync.Syncer {
|
// Currently, an implementation that batches files and transfers them using
|
||||||
|
// the Moby `Untar` API.
|
||||||
|
func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syncer, error) {
|
||||||
var useTar bool
|
var useTar bool
|
||||||
if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
|
if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
|
||||||
useTar, _ = strconv.ParseBool(useTarEnv)
|
useTar, _ = strconv.ParseBool(useTarEnv)
|
||||||
} else {
|
} else {
|
||||||
useTar = true
|
useTar = true
|
||||||
}
|
}
|
||||||
if useTar {
|
if !useTar {
|
||||||
return sync.NewTar(project.Name, tarDockerClient{s: s})
|
return nil, errors.New("no available sync implementation")
|
||||||
}
|
}
|
||||||
|
|
||||||
return sync.NewDockerCopy(project.Name, s, s.stdinfo())
|
return sync.NewTar(project.Name, tarDockerClient{s: s}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
|
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
|
||||||
|
@ -68,7 +70,10 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
|
||||||
if project, err = project.WithSelectedServices(services); err != nil {
|
if project, err = project.WithSelectedServices(services); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
syncer := s.getSyncImplementation(project)
|
syncer, err := s.getSyncImplementation(project)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
watching := false
|
watching := false
|
||||||
for i := range project.Services {
|
for i := range project.Services {
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -38,23 +37,12 @@ func TestWatch(t *testing.T) {
|
||||||
t.Skip("Skipping watch tests until we can figure out why they are flaky/failing")
|
t.Skip("Skipping watch tests until we can figure out why they are flaky/failing")
|
||||||
|
|
||||||
services := []string{"alpine", "busybox", "debian"}
|
services := []string{"alpine", "busybox", "debian"}
|
||||||
t.Run("docker cp", func(t *testing.T) {
|
|
||||||
for _, svcName := range services {
|
for _, svcName := range services {
|
||||||
t.Run(svcName, func(t *testing.T) {
|
t.Run(svcName, func(t *testing.T) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
doTest(t, svcName, false)
|
doTest(t, svcName)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("tar", func(t *testing.T) {
|
|
||||||
for _, svcName := range services {
|
|
||||||
t.Run(svcName, func(t *testing.T) {
|
|
||||||
t.Helper()
|
|
||||||
doTest(t, svcName, true)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
|
func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
|
||||||
|
@ -150,8 +138,9 @@ func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: these tests all share a single Compose file but are safe to run concurrently
|
// NOTE: these tests all share a single Compose file but are safe to run
|
||||||
func doTest(t *testing.T, svcName string, tarSync bool) {
|
// concurrently (though that's not recommended).
|
||||||
|
func doTest(t *testing.T, svcName string) {
|
||||||
tmpdir := t.TempDir()
|
tmpdir := t.TempDir()
|
||||||
dataDir := filepath.Join(tmpdir, "data")
|
dataDir := filepath.Join(tmpdir, "data")
|
||||||
configDir := filepath.Join(tmpdir, "config")
|
configDir := filepath.Join(tmpdir, "config")
|
||||||
|
@ -171,13 +160,9 @@ func doTest(t *testing.T, svcName string, tarSync bool) {
|
||||||
CopyFile(t, filepath.Join("fixtures", "watch", "compose.yaml"), composeFilePath)
|
CopyFile(t, filepath.Join("fixtures", "watch", "compose.yaml"), composeFilePath)
|
||||||
|
|
||||||
projName := "e2e-watch-" + svcName
|
projName := "e2e-watch-" + svcName
|
||||||
if tarSync {
|
|
||||||
projName += "-tar"
|
|
||||||
}
|
|
||||||
env := []string{
|
env := []string{
|
||||||
"COMPOSE_FILE=" + composeFilePath,
|
"COMPOSE_FILE=" + composeFilePath,
|
||||||
"COMPOSE_PROJECT_NAME=" + projName,
|
"COMPOSE_PROJECT_NAME=" + projName,
|
||||||
"COMPOSE_EXPERIMENTAL_WATCH_TAR=" + strconv.FormatBool(tarSync),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cli := NewCLI(t, WithEnv(env...))
|
cli := NewCLI(t, WithEnv(env...))
|
||||||
|
|
Loading…
Reference in New Issue