fix deadlock collecting large logs

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2024-02-13 15:13:28 +01:00 committed by Nicolas De loof
parent 894ab41c3b
commit 07bda5960e
3 changed files with 100 additions and 60 deletions

View File

@ -32,18 +32,19 @@ type logPrinter interface {
} }
type printer struct { type printer struct {
sync.Mutex
queue chan api.ContainerEvent queue chan api.ContainerEvent
consumer api.LogConsumer consumer api.LogConsumer
stopped bool stopCh chan struct{} // stopCh is a signal channel for producers to stop sending events to the queue
stop sync.Once
} }
// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer // newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
func newLogPrinter(consumer api.LogConsumer) logPrinter { func newLogPrinter(consumer api.LogConsumer) logPrinter {
queue := make(chan api.ContainerEvent)
printer := printer{ printer := printer{
consumer: consumer, consumer: consumer,
queue: queue, queue: make(chan api.ContainerEvent),
stopCh: make(chan struct{}),
stop: sync.Once{},
} }
return &printer return &printer
} }
@ -54,24 +55,27 @@ func (p *printer) Cancel() {
} }
func (p *printer) Stop() { func (p *printer) Stop() {
p.Lock() p.stop.Do(func() {
defer p.Unlock() close(p.stopCh)
if !p.stopped { for {
// only close if this is the first call to stop select {
p.stopped = true case <-p.queue:
close(p.queue) // purge the queue to free producers goroutines
} // p.queue will be garbage collected
default:
return
}
}
})
} }
func (p *printer) HandleEvent(event api.ContainerEvent) { func (p *printer) HandleEvent(event api.ContainerEvent) {
p.Lock() select {
defer p.Unlock() case <-p.stopCh:
if p.stopped {
// prevent deadlocking, if the printer is done, there's no reader for
// queue, so this write could block indefinitely
return return
default:
p.queue <- event
} }
p.queue <- event
} }
//nolint:gocyclo //nolint:gocyclo
@ -80,58 +84,64 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error
aborting bool aborting bool
exitCode int exitCode int
) )
defer p.Stop()
containers := map[string]struct{}{} containers := map[string]struct{}{}
for event := range p.queue { for {
container, id := event.Container, event.ID select {
switch event.Type { case <-p.stopCh:
case api.UserCancel: return exitCode, nil
aborting = true case event := <-p.queue:
case api.ContainerEventAttach: container, id := event.Container, event.ID
if _, ok := containers[id]; ok { switch event.Type {
continue case api.UserCancel:
} aborting = true
containers[id] = struct{}{} case api.ContainerEventAttach:
p.consumer.Register(container) if _, ok := containers[id]; ok {
case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated: continue
if !event.Restarting { }
delete(containers, id) containers[id] = struct{}{}
} p.consumer.Register(container)
if !aborting { case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated:
p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode)) if !event.Restarting {
if event.Type == api.ContainerEventRecreated { delete(containers, id)
p.consumer.Status(container, "has been recreated")
} }
}
if cascadeStop {
if !aborting { if !aborting {
aborting = true p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
err := stopFn() if event.Type == api.ContainerEventRecreated {
if err != nil { p.consumer.Status(container, "has been recreated")
return 0, err
} }
} }
if event.Type == api.ContainerEventExit { if cascadeStop {
if exitCodeFrom == "" { if !aborting {
exitCodeFrom = event.Service aborting = true
err := stopFn()
if err != nil {
return 0, err
}
} }
if exitCodeFrom == event.Service { if event.Type == api.ContainerEventExit {
exitCode = event.ExitCode if exitCodeFrom == "" {
exitCodeFrom = event.Service
}
if exitCodeFrom == event.Service {
exitCode = event.ExitCode
}
} }
} }
} if len(containers) == 0 {
if len(containers) == 0 { // Last container terminated, done
// Last container terminated, done return exitCode, nil
return exitCode, nil }
} case api.ContainerEventLog:
case api.ContainerEventLog: if !aborting {
if !aborting { p.consumer.Log(container, event.Line)
p.consumer.Log(container, event.Line) }
} case api.ContainerEventErr:
case api.ContainerEventErr: if !aborting {
if !aborting { p.consumer.Err(container, event.Line)
p.consumer.Err(container, event.Line) }
} }
} }
} }
return exitCode, nil
} }

View File

@ -0,0 +1,6 @@
services:
test:
image: alpine
command: cat /text_file.txt
volumes:
- ${FILE}:/text_file.txt

View File

@ -17,6 +17,10 @@
package e2e package e2e
import ( import (
"fmt"
"io"
"os"
"path/filepath"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -96,6 +100,26 @@ func TestLocalComposeLogsFollow(t *testing.T) {
poll.WaitOn(t, expectOutput(res, "ping-2 "), poll.WithDelay(100*time.Millisecond), poll.WithTimeout(20*time.Second)) poll.WaitOn(t, expectOutput(res, "ping-2 "), poll.WithDelay(100*time.Millisecond), poll.WithTimeout(20*time.Second))
} }
func TestLocalComposeLargeLogs(t *testing.T) {
const projectName = "compose-e2e-large_logs"
file := filepath.Join(t.TempDir(), "large.txt")
c := NewCLI(t, WithEnv("FILE="+file))
t.Cleanup(func() {
c.RunDockerComposeCmd(t, "--project-name", projectName, "down")
})
f, err := os.Create(file)
assert.NilError(t, err)
for i := 0; i < 300_000; i++ {
_, err := io.WriteString(f, fmt.Sprintf("This is line %d in a laaaarge text file\n", i))
assert.NilError(t, err)
}
assert.NilError(t, f.Close())
res := c.RunDockerComposeCmd(t, "-f", "./fixtures/logs-test/cat.yaml", "--project-name", projectName, "up", "--abort-on-container-exit")
res.Assert(t, icmd.Expected{Out: "test-1 exited with code 0"})
}
func expectOutput(res *icmd.Result, expected string) func(t poll.LogT) poll.Result { func expectOutput(res *icmd.Result, expected string) func(t poll.LogT) poll.Result {
return func(t poll.LogT) poll.Result { return func(t poll.LogT) poll.Result {
if strings.Contains(res.Stdout(), expected) { if strings.Contains(res.Stdout(), expected) {