From efd44de1b7ec1067de415cd0e3a7d4a911f84b28 Mon Sep 17 00:00:00 2001 From: Milas Bowman Date: Thu, 3 Aug 2023 14:52:39 -0400 Subject: [PATCH] watch: support multiple containers for tar implementation (#10860) Support services with scale > 1 for the tar watch sync. Add a "lossy" multi-writer specific to pipes that writes the tar data to each `io.PipeWriter`, which is connected to `stdin` for the `tar` process being exec'd in the container. The data is written serially to each writer. This could be adjusted to do concurrent writes but that will rapidly increase the I/O load, so is not done here - in general, 99% of the time you'll be developing (and thus using watch/sync) with a single replica of a service. If a write fails, the corresponding `io.PipeWriter` is removed from the active set and closed with an error. This means that a single container copy failing won't stop writes to the others that are succeeding. Of course, they will be in an inconsistent state afterwards still, but that's a different problem. Signed-off-by: Milas Bowman --- internal/sync/tar.go | 21 ++++- internal/sync/writer.go | 91 +++++++++++++++++++++ internal/sync/writer_test.go | 152 +++++++++++++++++++++++++++++++++++ 3 files changed, 260 insertions(+), 4 deletions(-) create mode 100644 internal/sync/writer.go create mode 100644 internal/sync/writer_test.go diff --git a/internal/sync/tar.go b/internal/sync/tar.go index 3296783ee..ff3990ebb 100644 --- a/internal/sync/tar.go +++ b/internal/sync/tar.go @@ -80,9 +80,6 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat } } - // TODO: this can't be read from multiple times - tarReader := tarArchive(pathsToCopy) - var deleteCmd []string if len(pathsToDelete) != 0 { deleteCmd = append([]string{"rm", "-rf"}, pathsToDelete...) @@ -90,20 +87,36 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat copyCmd := []string{"tar", "-v", "-C", "/", "-x", "-f", "-"} var eg multierror.Group + writers := make([]*io.PipeWriter, len(containers)) for i := range containers { containerID := containers[i].ID + r, w := io.Pipe() + writers[i] = w eg.Go(func() error { if len(deleteCmd) != 0 { if err := t.client.Exec(ctx, containerID, deleteCmd, nil); err != nil { return fmt.Errorf("deleting paths in %s: %w", containerID, err) } } - if err := t.client.Exec(ctx, containerID, copyCmd, tarReader); err != nil { + if err := t.client.Exec(ctx, containerID, copyCmd, r); err != nil { return fmt.Errorf("copying files to %s: %w", containerID, err) } return nil }) } + + multiWriter := newLossyMultiWriter(writers...) + tarReader := tarArchive(pathsToCopy) + defer func() { + _ = tarReader.Close() + multiWriter.Close() + }() + _, err = io.Copy(multiWriter, tarReader) + if err != nil { + return err + } + multiWriter.Close() + return eg.Wait().ErrorOrNil() } diff --git a/internal/sync/writer.go b/internal/sync/writer.go new file mode 100644 index 000000000..f5c182d1b --- /dev/null +++ b/internal/sync/writer.go @@ -0,0 +1,91 @@ +/* + Copyright 2023 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "errors" + "io" +) + +// lossyMultiWriter attempts to tee all writes to the provided io.PipeWriter +// instances. +// +// If a writer fails during a Write call, the write-side of the pipe is then +// closed with the error and no subsequent attempts are made to write to the +// pipe. +// +// If all writers fail during a write, an error is returned. +// +// On Close, any remaining writers are closed. +type lossyMultiWriter struct { + writers []*io.PipeWriter +} + +// newLossyMultiWriter creates a new writer that *attempts* to tee all data written to it to the provided io.PipeWriter +// instances. Rather than failing a write operation if any writer fails, writes only fail if there are no more valid +// writers. Otherwise, errors for specific writers are propagated via CloseWithError. +func newLossyMultiWriter(writers ...*io.PipeWriter) *lossyMultiWriter { + // reverse the writers because during the write we iterate + // backwards, so this way we'll end up writing in the same + // order as the writers were passed to us + writers = append([]*io.PipeWriter(nil), writers...) + for i, j := 0, len(writers)-1; i < j; i, j = i+1, j-1 { + writers[i], writers[j] = writers[j], writers[i] + } + + return &lossyMultiWriter{ + writers: writers, + } +} + +// Write writes to each writer that is still active (i.e. has not failed/encountered an error on write). +// +// If a writer encounters an error during the write, the write side of the pipe is closed with the error +// and no subsequent attempts will be made to write to that writer. +// +// An error is only returned from this function if ALL writers have failed. +func (l *lossyMultiWriter) Write(p []byte) (int, error) { + // NOTE: this function iterates backwards so that it can + // safely remove elements during the loop + for i := len(l.writers) - 1; i >= 0; i-- { + written, err := l.writers[i].Write(p) + if err == nil && written != len(p) { + err = io.ErrShortWrite + } + if err != nil { + // pipe writer close cannot fail + _ = l.writers[i].CloseWithError(err) + l.writers = append(l.writers[:i], l.writers[i+1:]...) + } + } + + if len(l.writers) == 0 { + return 0, errors.New("no writers remaining") + } + + return len(p), nil +} + +// Close closes any still open (non-failed) writers. +// +// Failed writers have already been closed with an error. +func (l *lossyMultiWriter) Close() { + for i := range l.writers { + // pipe writer close cannot fail + _ = l.writers[i].Close() + } +} diff --git a/internal/sync/writer_test.go b/internal/sync/writer_test.go new file mode 100644 index 000000000..1e5c13d40 --- /dev/null +++ b/internal/sync/writer_test.go @@ -0,0 +1,152 @@ +/* + Copyright 2023 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package sync + +import ( + "context" + "io" + "sync" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestLossyMultiWriter(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + const count = 5 + readers := make([]*bufReader, count) + writers := make([]*io.PipeWriter, count) + for i := 0; i < count; i++ { + r, w := io.Pipe() + readers[i] = newBufReader(ctx, r) + writers[i] = w + } + + w := newLossyMultiWriter(writers...) + t.Cleanup(w.Close) + n, err := w.Write([]byte("hello world")) + require.Equal(t, 11, n) + require.NoError(t, err) + for i := range readers { + readers[i].waitForWrite(t) + require.Equal(t, "hello world", string(readers[i].contents())) + readers[i].reset() + } + + // even if a writer fails (in this case simulated by closing the receiving end of the pipe), + // write operations should continue to return nil error but the writer should be closed + // with an error + const failIndex = 3 + require.NoError(t, readers[failIndex].r.CloseWithError(errors.New("oh no"))) + n, err = w.Write([]byte("hello")) + require.Equal(t, 5, n) + require.NoError(t, err) + for i := range readers { + readers[i].waitForWrite(t) + if i == failIndex { + err := readers[i].error() + require.EqualError(t, err, "io: read/write on closed pipe") + require.Empty(t, readers[i].contents()) + } else { + require.Equal(t, "hello", string(readers[i].contents())) + } + } + + // perform another write, verify there's still no errors + n, err = w.Write([]byte(" world")) + require.Equal(t, 6, n) + require.NoError(t, err) +} + +type bufReader struct { + ctx context.Context + r *io.PipeReader + mu sync.Mutex + err error + data []byte + writeSync chan struct{} +} + +func newBufReader(ctx context.Context, r *io.PipeReader) *bufReader { + b := &bufReader{ + ctx: ctx, + r: r, + writeSync: make(chan struct{}), + } + go b.consume() + return b +} + +func (b *bufReader) waitForWrite(t testing.TB) { + t.Helper() + select { + case <-b.writeSync: + return + case <-time.After(50 * time.Millisecond): + t.Fatal("timed out waiting for write") + } +} + +func (b *bufReader) consume() { + defer close(b.writeSync) + for { + buf := make([]byte, 512) + n, err := b.r.Read(buf) + if n != 0 { + b.mu.Lock() + b.data = append(b.data, buf[:n]...) + b.mu.Unlock() + } + if err == io.EOF { + return + } + if err != nil { + b.mu.Lock() + b.err = err + b.mu.Unlock() + return + } + // prevent goroutine leak, tie lifetime to the test + select { + case b.writeSync <- struct{}{}: + case <-b.ctx.Done(): + return + } + } +} + +func (b *bufReader) contents() []byte { + b.mu.Lock() + defer b.mu.Unlock() + return b.data +} + +func (b *bufReader) reset() { + b.mu.Lock() + defer b.mu.Unlock() + b.data = nil +} + +func (b *bufReader) error() error { + b.mu.Lock() + defer b.mu.Unlock() + return b.err +}