Merge pull request #11497 from milas/rm-docker-cp-syncer

chore(watch): remove old `docker cp` implementation
This commit is contained in:
Guillaume Lours 2024-02-14 21:26:05 +01:00 committed by GitHub
commit a220043ca0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 22 additions and 379 deletions

View File

@ -1,104 +0,0 @@
/*
Copyright 2023 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"github.com/compose-spec/compose-go/v2/types"
"github.com/docker/compose/v2/pkg/api"
"github.com/sirupsen/logrus"
)
type ComposeClient interface {
Exec(ctx context.Context, projectName string, options api.RunOptions) (int, error)
Copy(ctx context.Context, projectName string, options api.CopyOptions) error
}
type DockerCopy struct {
client ComposeClient
projectName string
infoWriter io.Writer
}
var _ Syncer = &DockerCopy{}
func NewDockerCopy(projectName string, client ComposeClient, infoWriter io.Writer) *DockerCopy {
return &DockerCopy{
projectName: projectName,
client: client,
infoWriter: infoWriter,
}
}
func (d *DockerCopy) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
var errs []error
for i := range paths {
if err := d.sync(ctx, service, paths[i]); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, pathMapping PathMapping) error {
scale := service.GetScale()
if fi, statErr := os.Stat(pathMapping.HostPath); statErr == nil {
if fi.IsDir() {
for i := 1; i <= scale; i++ {
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
Service: service.Name,
Command: []string{"mkdir", "-p", pathMapping.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, service.Name, err)
}
}
fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath)
} else {
err := d.client.Copy(ctx, d.projectName, api.CopyOptions{
Source: pathMapping.HostPath,
Destination: fmt.Sprintf("%s:%s", service.Name, pathMapping.ContainerPath),
})
if err != nil {
return err
}
fmt.Fprintf(d.infoWriter, "%s updated\n", pathMapping.ContainerPath)
}
} else if errors.Is(statErr, fs.ErrNotExist) {
for i := 1; i <= scale; i++ {
_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
Service: service.Name,
Command: []string{"rm", "-rf", pathMapping.ContainerPath},
Index: i,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, service.Name, err)
}
}
fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath)
}
return nil
}

View File

@ -1,91 +0,0 @@
/*
Copyright 2023 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"errors"
"io"
)
// lossyMultiWriter attempts to tee all writes to the provided io.PipeWriter
// instances.
//
// If a writer fails during a Write call, the write-side of the pipe is then
// closed with the error and no subsequent attempts are made to write to the
// pipe.
//
// If all writers fail during a write, an error is returned.
//
// On Close, any remaining writers are closed.
type lossyMultiWriter struct {
writers []*io.PipeWriter
}
// newLossyMultiWriter creates a new writer that *attempts* to tee all data written to it to the provided io.PipeWriter
// instances. Rather than failing a write operation if any writer fails, writes only fail if there are no more valid
// writers. Otherwise, errors for specific writers are propagated via CloseWithError.
func newLossyMultiWriter(writers ...*io.PipeWriter) *lossyMultiWriter {
// reverse the writers because during the write we iterate
// backwards, so this way we'll end up writing in the same
// order as the writers were passed to us
writers = append([]*io.PipeWriter(nil), writers...)
for i, j := 0, len(writers)-1; i < j; i, j = i+1, j-1 {
writers[i], writers[j] = writers[j], writers[i]
}
return &lossyMultiWriter{
writers: writers,
}
}
// Write writes to each writer that is still active (i.e. has not failed/encountered an error on write).
//
// If a writer encounters an error during the write, the write side of the pipe is closed with the error
// and no subsequent attempts will be made to write to that writer.
//
// An error is only returned from this function if ALL writers have failed.
func (l *lossyMultiWriter) Write(p []byte) (int, error) {
// NOTE: this function iterates backwards so that it can
// safely remove elements during the loop
for i := len(l.writers) - 1; i >= 0; i-- {
written, err := l.writers[i].Write(p)
if err == nil && written != len(p) {
err = io.ErrShortWrite
}
if err != nil {
// pipe writer close cannot fail
_ = l.writers[i].CloseWithError(err)
l.writers = append(l.writers[:i], l.writers[i+1:]...)
}
}
if len(l.writers) == 0 {
return 0, errors.New("no writers remaining")
}
return len(p), nil
}
// Close closes any still open (non-failed) writers.
//
// Failed writers have already been closed with an error.
func (l *lossyMultiWriter) Close() {
for i := range l.writers {
// pipe writer close cannot fail
_ = l.writers[i].Close()
}
}

View File

@ -1,152 +0,0 @@
/*
Copyright 2023 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sync
import (
"context"
"errors"
"io"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestLossyMultiWriter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
const count = 5
readers := make([]*bufReader, count)
writers := make([]*io.PipeWriter, count)
for i := 0; i < count; i++ {
r, w := io.Pipe()
readers[i] = newBufReader(ctx, r)
writers[i] = w
}
w := newLossyMultiWriter(writers...)
t.Cleanup(w.Close)
n, err := w.Write([]byte("hello world"))
require.Equal(t, 11, n)
require.NoError(t, err)
for i := range readers {
readers[i].waitForWrite(t)
require.Equal(t, "hello world", string(readers[i].contents()))
readers[i].reset()
}
// even if a writer fails (in this case simulated by closing the receiving end of the pipe),
// write operations should continue to return nil error but the writer should be closed
// with an error
const failIndex = 3
require.NoError(t, readers[failIndex].r.CloseWithError(errors.New("oh no")))
n, err = w.Write([]byte("hello"))
require.Equal(t, 5, n)
require.NoError(t, err)
for i := range readers {
readers[i].waitForWrite(t)
if i == failIndex {
err := readers[i].error()
require.EqualError(t, err, "io: read/write on closed pipe")
require.Empty(t, readers[i].contents())
} else {
require.Equal(t, "hello", string(readers[i].contents()))
}
}
// perform another write, verify there's still no errors
n, err = w.Write([]byte(" world"))
require.Equal(t, 6, n)
require.NoError(t, err)
}
type bufReader struct {
ctx context.Context
r *io.PipeReader
mu sync.Mutex
err error
data []byte
writeSync chan struct{}
}
func newBufReader(ctx context.Context, r *io.PipeReader) *bufReader {
b := &bufReader{
ctx: ctx,
r: r,
writeSync: make(chan struct{}),
}
go b.consume()
return b
}
func (b *bufReader) waitForWrite(t testing.TB) {
t.Helper()
select {
case <-b.writeSync:
return
case <-time.After(50 * time.Millisecond):
t.Fatal("timed out waiting for write")
}
}
func (b *bufReader) consume() {
defer close(b.writeSync)
for {
buf := make([]byte, 512)
n, err := b.r.Read(buf)
if n != 0 {
b.mu.Lock()
b.data = append(b.data, buf[:n]...)
b.mu.Unlock()
}
if errors.Is(err, io.EOF) {
return
}
if err != nil {
b.mu.Lock()
b.err = err
b.mu.Unlock()
return
}
// prevent goroutine leak, tie lifetime to the test
select {
case b.writeSync <- struct{}{}:
case <-b.ctx.Done():
return
}
}
}
func (b *bufReader) contents() []byte {
b.mu.Lock()
defer b.mu.Unlock()
return b.data
}
func (b *bufReader) reset() {
b.mu.Lock()
defer b.mu.Unlock()
b.data = nil
}
func (b *bufReader) error() error {
b.mu.Lock()
defer b.mu.Unlock()
return b.err
}

View File

@ -46,21 +46,23 @@ type fileEvent struct {
Action types.WatchAction
}
// getSyncImplementation returns the the tar-based syncer unless it has been explicitly
// disabled with `COMPOSE_EXPERIMENTAL_WATCH_TAR=0`. Note that the absence of the env
// var means enabled.
func (s *composeService) getSyncImplementation(project *types.Project) sync.Syncer {
// getSyncImplementation returns an appropriate sync implementation for the
// project.
//
// Currently, an implementation that batches files and transfers them using
// the Moby `Untar` API.
func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syncer, error) {
var useTar bool
if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
useTar, _ = strconv.ParseBool(useTarEnv)
} else {
useTar = true
}
if useTar {
return sync.NewTar(project.Name, tarDockerClient{s: s})
if !useTar {
return nil, errors.New("no available sync implementation")
}
return sync.NewDockerCopy(project.Name, s, s.stdinfo())
return sync.NewTar(project.Name, tarDockerClient{s: s}), nil
}
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
@ -68,7 +70,10 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
if project, err = project.WithSelectedServices(services); err != nil {
return err
}
syncer := s.getSyncImplementation(project)
syncer, err := s.getSyncImplementation(project)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
watching := false
for i := range project.Services {

View File

@ -21,7 +21,6 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"testing"
@ -38,23 +37,12 @@ func TestWatch(t *testing.T) {
t.Skip("Skipping watch tests until we can figure out why they are flaky/failing")
services := []string{"alpine", "busybox", "debian"}
t.Run("docker cp", func(t *testing.T) {
for _, svcName := range services {
t.Run(svcName, func(t *testing.T) {
t.Helper()
doTest(t, svcName, false)
})
}
})
t.Run("tar", func(t *testing.T) {
for _, svcName := range services {
t.Run(svcName, func(t *testing.T) {
t.Helper()
doTest(t, svcName, true)
})
}
})
for _, svcName := range services {
t.Run(svcName, func(t *testing.T) {
t.Helper()
doTest(t, svcName)
})
}
}
func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
@ -150,8 +138,9 @@ func TestRebuildOnDotEnvWithExternalNetwork(t *testing.T) {
}
// NOTE: these tests all share a single Compose file but are safe to run concurrently
func doTest(t *testing.T, svcName string, tarSync bool) {
// NOTE: these tests all share a single Compose file but are safe to run
// concurrently (though that's not recommended).
func doTest(t *testing.T, svcName string) {
tmpdir := t.TempDir()
dataDir := filepath.Join(tmpdir, "data")
configDir := filepath.Join(tmpdir, "config")
@ -171,13 +160,9 @@ func doTest(t *testing.T, svcName string, tarSync bool) {
CopyFile(t, filepath.Join("fixtures", "watch", "compose.yaml"), composeFilePath)
projName := "e2e-watch-" + svcName
if tarSync {
projName += "-tar"
}
env := []string{
"COMPOSE_FILE=" + composeFilePath,
"COMPOSE_PROJECT_NAME=" + projName,
"COMPOSE_EXPERIMENTAL_WATCH_TAR=" + strconv.FormatBool(tarSync),
}
cli := NewCLI(t, WithEnv(env...))