diff --git a/cmd/formatter/logs.go b/cmd/formatter/logs.go index 0e2d206c8..2faa77b65 100644 --- a/cmd/formatter/logs.go +++ b/cmd/formatter/logs.go @@ -56,10 +56,6 @@ func NewLogConsumer(ctx context.Context, stdout, stderr io.Writer, color, prefix } } -func (l *logConsumer) Register(name string) { - l.register(name) -} - func (l *logConsumer) register(name string) *presenter { var p *presenter root, _, found := strings.Cut(name, " ") diff --git a/pkg/api/api.go b/pkg/api/api.go index 359893bb3..0bec86a6e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -649,7 +649,6 @@ type LogConsumer interface { Log(containerName, message string) Err(containerName, message string) Status(container, msg string) - Register(container string) } // ContainerEventListener is a callback to process ContainerEvent from services @@ -657,16 +656,18 @@ type ContainerEventListener func(event ContainerEvent) // ContainerEvent notify an event has been collected on source container implementing Service type ContainerEvent struct { - Type int - // Container is the name of the container _without the project prefix_. + Type int + Time int64 + Container *ContainerSummary + // Source is the name of the container _without the project prefix_. // // This is only suitable for display purposes within Compose, as it's // not guaranteed to be unique across services. - Container string - ID string - Service string - Line string - // ContainerEventExit only + Source string + ID string + Service string + Line string + // ContainerEventExited only ExitCode int Restarting bool } @@ -676,17 +677,19 @@ const ( ContainerEventLog = iota // ContainerEventErr is a ContainerEvent of type log on stderr. Line is set ContainerEventErr - // ContainerEventAttach is a ContainerEvent of type attach. First event sent about a container - ContainerEventAttach + // ContainerEventStarted let consumer know a container has been started + ContainerEventStarted + // ContainerEventRestarted let consumer know a container has been restarted + ContainerEventRestarted // ContainerEventStopped is a ContainerEvent of type stopped. ContainerEventStopped + // ContainerEventCreated let consumer know a new container has been created + ContainerEventCreated // ContainerEventRecreated let consumer know container stopped but his being replaced ContainerEventRecreated - // ContainerEventExit is a ContainerEvent of type exit. ExitCode is set - ContainerEventExit + // ContainerEventExited is a ContainerEvent of type exit. ExitCode is set + ContainerEventExited // UserCancel user cancelled compose up, we are stopping containers - UserCancel - // HookEventLog is a ContainerEvent of type log on stdout by service hook HookEventLog ) diff --git a/pkg/compose/attach.go b/pkg/compose/attach.go index 8c17e056a..897c47331 100644 --- a/pkg/compose/attach.go +++ b/pkg/compose/attach.go @@ -61,41 +61,37 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis } func (s *composeService) attachContainer(ctx context.Context, container containerType.Summary, listener api.ContainerEventListener) error { - serviceName := container.Labels[api.ServiceLabel] - containerName := getContainerNameWithoutProject(container) + service := container.Labels[api.ServiceLabel] + name := getContainerNameWithoutProject(container) + return s.doAttachContainer(ctx, service, container.ID, name, listener) +} - listener(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: containerName, - ID: container.ID, - Service: serviceName, - }) - - wOut := utils.GetWriter(func(line string) { - listener(api.ContainerEvent{ - Type: api.ContainerEventLog, - Container: containerName, - ID: container.ID, - Service: serviceName, - Line: line, - }) - }) - wErr := utils.GetWriter(func(line string) { - listener(api.ContainerEvent{ - Type: api.ContainerEventErr, - Container: containerName, - ID: container.ID, - Service: serviceName, - Line: line, - }) - }) - - inspect, err := s.apiClient().ContainerInspect(ctx, container.ID) +func (s *composeService) doAttachContainer(ctx context.Context, service, id, name string, listener api.ContainerEventListener) error { + inspect, err := s.apiClient().ContainerInspect(ctx, id) if err != nil { return err } - _, _, err = s.attachContainerStreams(ctx, container.ID, inspect.Config.Tty, nil, wOut, wErr) + wOut := utils.GetWriter(func(line string) { + listener(api.ContainerEvent{ + Type: api.ContainerEventLog, + Source: name, + ID: id, + Service: service, + Line: line, + }) + }) + wErr := utils.GetWriter(func(line string) { + listener(api.ContainerEvent{ + Type: api.ContainerEventErr, + Source: name, + ID: id, + Service: service, + Line: line, + }) + }) + + _, _, err = s.attachContainerStreams(ctx, id, inspect.Config.Tty, nil, wOut, wErr) return err } diff --git a/pkg/compose/containers.go b/pkg/compose/containers.go index ebf70d013..598cc2a23 100644 --- a/pkg/compose/containers.go +++ b/pkg/compose/containers.go @@ -128,12 +128,6 @@ func isService(services ...string) containerPredicate { } } -func isRunning() containerPredicate { - return func(c container.Summary) bool { - return c.State == "running" - } -} - // isOrphaned is a predicate to select containers without a matching service definition in compose project func isOrphaned(project *types.Project) containerPredicate { services := append(project.ServiceNames(), project.DisabledServiceNames()...) diff --git a/pkg/compose/hook.go b/pkg/compose/hook.go index 6bd3f84bf..dd02de640 100644 --- a/pkg/compose/hook.go +++ b/pkg/compose/hook.go @@ -32,11 +32,11 @@ import ( func (s composeService) runHook(ctx context.Context, ctr container.Summary, service types.ServiceConfig, hook types.ServiceHook, listener api.ContainerEventListener) error { wOut := utils.GetWriter(func(line string) { listener(api.ContainerEvent{ - Type: api.HookEventLog, - Container: getContainerNameWithoutProject(ctr) + " ->", - ID: ctr.ID, - Service: service.Name, - Line: line, + Type: api.HookEventLog, + Source: getContainerNameWithoutProject(ctr) + " ->", + ID: ctr.ID, + Service: service.Name, + Line: line, }) }) defer wOut.Close() //nolint:errcheck diff --git a/pkg/compose/logs.go b/pkg/compose/logs.go index b9a108fc3..2976dace4 100644 --- a/pkg/compose/logs.go +++ b/pkg/compose/logs.go @@ -62,7 +62,7 @@ func (s *composeService) Logs( eg, ctx := errgroup.WithContext(ctx) for _, ctr := range containers { eg.Go(func() error { - err := s.logContainers(ctx, consumer, ctr, options) + err := s.logContainer(ctx, consumer, ctr, options) if errdefs.IsNotImplemented(err) { logrus.Warnf("Can't retrieve logs for %q: %s", getCanonicalContainerName(ctr), err.Error()) return nil @@ -72,34 +72,21 @@ func (s *composeService) Logs( } if options.Follow { - containers = containers.filter(isRunning()) printer := newLogPrinter(consumer) - eg.Go(func() error { - _, err := printer.Run(api.CascadeIgnore, "", nil) - return err - }) + eg.Go(printer.Run) - for _, c := range containers { - printer.HandleEvent(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: getContainerNameWithoutProject(c), - ID: c.ID, - Service: c.Labels[api.ServiceLabel], - }) - } - - eg.Go(func() error { - err := s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c container.Summary, t time.Time) error { - printer.HandleEvent(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: getContainerNameWithoutProject(c), - ID: c.ID, - Service: c.Labels[api.ServiceLabel], - }) + monitor := newMonitor(s.apiClient(), options.Project) + monitor.withListener(func(event api.ContainerEvent) { + if event.Type == api.ContainerEventStarted { eg.Go(func() error { - err := s.logContainers(ctx, consumer, c, api.LogOptions{ + ctr, err := s.apiClient().ContainerInspect(ctx, event.ID) + if err != nil { + return err + } + + err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{ Follow: options.Follow, - Since: t.Format(time.RFC3339Nano), + Since: time.Unix(0, event.Time).Format(time.RFC3339Nano), Until: options.Until, Tail: options.Tail, Timestamps: options.Timestamps, @@ -110,31 +97,28 @@ func (s *composeService) Logs( } return err }) - return nil - }, func(c container.Summary, t time.Time) error { - printer.HandleEvent(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: "", // actual name will be set by start event - ID: c.ID, - Service: c.Labels[api.ServiceLabel], - }) - return nil - }) - printer.Stop() - return err + } + }) + eg.Go(func() error { + defer printer.Stop() + return monitor.Start(ctx) }) } return eg.Wait() } -func (s *composeService) logContainers(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error { - cnt, err := s.apiClient().ContainerInspect(ctx, c.ID) +func (s *composeService) logContainer(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error { + ctr, err := s.apiClient().ContainerInspect(ctx, c.ID) if err != nil { return err } + name := getContainerNameWithoutProject(c) + return s.doLogContainer(ctx, consumer, name, ctr, options) +} - r, err := s.apiClient().ContainerLogs(ctx, cnt.ID, container.LogsOptions{ +func (s *composeService) doLogContainer(ctx context.Context, consumer api.LogConsumer, name string, ctr container.InspectResponse, options api.LogOptions) error { + r, err := s.apiClient().ContainerLogs(ctx, ctr.ID, container.LogsOptions{ ShowStdout: true, ShowStderr: true, Follow: options.Follow, @@ -148,11 +132,10 @@ func (s *composeService) logContainers(ctx context.Context, consumer api.LogCons } defer r.Close() //nolint:errcheck - name := getContainerNameWithoutProject(c) w := utils.GetWriter(func(line string) { consumer.Log(name, line) }) - if cnt.Config.Tty { + if ctr.Config.Tty { _, err = io.Copy(w, r) } else { _, err = stdcopy.StdCopy(w, w, r) diff --git a/pkg/compose/logs_test.go b/pkg/compose/logs_test.go index 46893e636..955b5e770 100644 --- a/pkg/compose/logs_test.go +++ b/pkg/compose/logs_test.go @@ -189,8 +189,6 @@ func (l *testLogConsumer) Err(containerName, message string) { func (l *testLogConsumer) Status(containerName, msg string) {} -func (l *testLogConsumer) Register(containerName string) {} - func (l *testLogConsumer) LogsForContainer(containerName string) []string { l.mu.Lock() defer l.mu.Unlock() diff --git a/pkg/compose/monitor.go b/pkg/compose/monitor.go new file mode 100644 index 000000000..624f01d5d --- /dev/null +++ b/pkg/compose/monitor.go @@ -0,0 +1,211 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "context" + "strconv" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/containerd/errdefs" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/client" + "github.com/sirupsen/logrus" + + "github.com/docker/compose/v2/pkg/api" + "github.com/docker/compose/v2/pkg/utils" +) + +type monitor struct { + api client.APIClient + project *types.Project + // services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command + services map[string]bool + listeners []api.ContainerEventListener +} + +func newMonitor(api client.APIClient, project *types.Project) *monitor { + services := map[string]bool{} + if project != nil { + for name := range project.Services { + services[name] = true + } + } + return &monitor{ + api: api, + project: project, + services: services, + } +} + +// Start runs monitor to detect application events and return after termination +// +//nolint:gocyclo +func (c *monitor) Start(ctx context.Context) error { + // collect initial application container + initialState, err := c.api.ContainerList(ctx, container.ListOptions{ + All: true, + Filters: filters.NewArgs( + projectFilter(c.project.Name), + oneOffFilter(false), + hasConfigHashLabel(), + ), + }) + if err != nil { + return err + } + + // containers is the set if container IDs the application is based on + containers := utils.Set[string]{} + for _, ctr := range initialState { + if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] { + containers.Add(ctr.ID) + } + } + + restarting := utils.Set[string]{} + + evtCh, errCh := c.api.Events(ctx, events.ListOptions{ + Filters: filters.NewArgs( + filters.Arg("type", "container"), + projectFilter(c.project.Name)), + }) + for { + select { + case <-ctx.Done(): + return nil + case err := <-errCh: + return err + case event := <-evtCh: + if !c.services[event.Actor.Attributes[api.ServiceLabel]] { + continue + } + ctr, err := c.getContainerSummary(event) + if err != nil { + return err + } + + switch event.Action { + case events.ActionCreate: + containers.Add(ctr.ID) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventCreated)) + } + logrus.Debugf("container %s created", ctr.Name) + case events.ActionStart: + restarted := restarting.Has(ctr.ID) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) { + e.Restarting = restarted + })) + } + if restarted { + logrus.Debugf("container %s restarted", ctr.Name) + } else { + logrus.Debugf("container %s started", ctr.Name) + } + containers.Add(ctr.ID) + case events.ActionRestart: + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted)) + } + logrus.Debugf("container %s restarted", ctr.Name) + case events.ActionStop: + // when a container is in restarting phase, and we stop the application (abort-on-container-exit) + // we won't get any additional start+die events, just this stop as a proof container is down + logrus.Debugf("container %s stopped", ctr.Name) + containers.Remove(ctr.ID) + case events.ActionDie: + logrus.Debugf("container %s exited with code %d", ctr.Name, ctr.ExitCode) + inspect, err := c.api.ContainerInspect(ctx, event.Actor.ID) + if errdefs.IsNotFound(err) { + // Source is already removed + } else if err != nil { + return err + } + + if inspect.State != nil && inspect.State.Restarting || inspect.State.Running { + // State.Restarting is set by engine when container is configured to restart on exit + // on ContainerRestart it doesn't (see https://github.com/moby/moby/issues/45538) + // container state still is reported as "running" + logrus.Debugf("container %s is restarting", ctr.Name) + restarting.Add(ctr.ID) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventExited, func(e *api.ContainerEvent) { + e.Restarting = true + })) + } + } else { + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventExited)) + } + containers.Remove(ctr.ID) + } + } + } + if len(containers) == 0 { + return nil + } + } +} + +func newContainerEvent(timeNano int64, ctr *api.ContainerSummary, eventType int, opts ...func(e *api.ContainerEvent)) api.ContainerEvent { + name := ctr.Name + defaultName := getDefaultContainerName(ctr.Project, ctr.Labels[api.ServiceLabel], ctr.Labels[api.ContainerNumberLabel]) + if name == defaultName { + // remove project- prefix + name = name[len(ctr.Project)+1:] + } + + event := api.ContainerEvent{ + Type: eventType, + Container: ctr, + Time: timeNano, + Source: name, + ID: ctr.ID, + Service: ctr.Service, + ExitCode: ctr.ExitCode, + } + for _, opt := range opts { + opt(&event) + } + return event +} + +func (c *monitor) getContainerSummary(event events.Message) (*api.ContainerSummary, error) { + ctr := &api.ContainerSummary{ + ID: event.Actor.ID, + Name: event.Actor.Attributes["name"], + Project: c.project.Name, + Service: event.Actor.Attributes[api.ServiceLabel], + Labels: event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us + } + if ec, ok := event.Actor.Attributes["exitCode"]; ok { + exitCode, err := strconv.Atoi(ec) + if err != nil { + return nil, err + } + ctr.ExitCode = exitCode + } + return ctr, nil +} + +func (c *monitor) withListener(listener api.ContainerEventListener) { + c.listeners = append(c.listeners, listener) +} diff --git a/pkg/compose/printer.go b/pkg/compose/printer.go index 338312fae..55084270a 100644 --- a/pkg/compose/printer.go +++ b/pkg/compose/printer.go @@ -26,8 +26,7 @@ import ( // logPrinter watch application containers and collect their logs type logPrinter interface { HandleEvent(event api.ContainerEvent) - Run(cascade api.Cascade, exitCodeFrom string, stopFn func() error) (int, error) - Cancel() + Run() error Stop() } @@ -49,11 +48,6 @@ func newLogPrinter(consumer api.LogConsumer) logPrinter { return &printer } -func (p *printer) Cancel() { - // note: HandleEvent is used to ensure this doesn't deadlock - p.HandleEvent(api.ContainerEvent{Type: api.UserCancel}) -} - func (p *printer) Stop() { p.stop.Do(func() { close(p.stopCh) @@ -78,82 +72,25 @@ func (p *printer) HandleEvent(event api.ContainerEvent) { } } -//nolint:gocyclo -func (p *printer) Run(cascade api.Cascade, exitCodeFrom string, stopFn func() error) (int, error) { - var ( - aborting bool - exitCode int - ) +func (p *printer) Run() error { defer p.Stop() // containers we are tracking. Use true when container is running, false after we receive a stop|die signal - containers := map[string]bool{} for { select { case <-p.stopCh: - return exitCode, nil + return nil case event := <-p.queue: - container, id := event.Container, event.ID switch event.Type { - case api.UserCancel: - aborting = true - case api.ContainerEventAttach: - if attached, ok := containers[id]; ok && attached { - continue - } - containers[id] = true - p.consumer.Register(container) - case api.ContainerEventExit, api.ContainerEventStopped, api.ContainerEventRecreated: - if !aborting && containers[id] { - p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode)) - if event.Type == api.ContainerEventRecreated { - p.consumer.Status(container, "has been recreated") - } - } - containers[id] = false - if !event.Restarting { - delete(containers, id) - } - - if cascade == api.CascadeStop { - if !aborting { - aborting = true - err := stopFn() - if err != nil { - return 0, err - } - } - } - if event.Type == api.ContainerEventExit { - if cascade == api.CascadeFail && event.ExitCode != 0 { - exitCodeFrom = event.Service - if !aborting { - aborting = true - err := stopFn() - if err != nil { - return 0, err - } - } - } - if cascade == api.CascadeStop && exitCodeFrom == "" { - exitCodeFrom = event.Service - } - } - - if exitCodeFrom == event.Service && (event.Type == api.ContainerEventExit || event.Type == api.ContainerEventStopped) { - // Container was interrupted or exited, let's capture exit code - exitCode = event.ExitCode - } - if len(containers) == 0 { - // Last container terminated, done - return exitCode, nil + case api.ContainerEventExited, api.ContainerEventStopped, api.ContainerEventRecreated, api.ContainerEventRestarted: + p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) + if event.Type == api.ContainerEventRecreated { + p.consumer.Status(event.Source, "has been recreated") } case api.ContainerEventLog, api.HookEventLog: - p.consumer.Log(container, event.Line) + p.consumer.Log(event.Source, event.Line) case api.ContainerEventErr: - if !aborting { - p.consumer.Err(container, event.Line) - } + p.consumer.Err(event.Source, event.Line) } } } diff --git a/pkg/compose/start.go b/pkg/compose/start.go index a18487b50..1d1c264cf 100644 --- a/pkg/compose/start.go +++ b/pkg/compose/start.go @@ -20,14 +20,10 @@ import ( "context" "errors" "fmt" - "slices" "strings" - "time" - cerrdefs "github.com/containerd/errdefs" "github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/progress" - "github.com/docker/compose/v2/pkg/utils" containerType "github.com/docker/docker/api/types/container" "github.com/compose-spec/compose-go/v2/types" @@ -66,48 +62,6 @@ func (s *composeService) start(ctx context.Context, projectName string, options if err != nil { return err } - - eg.Go(func() error { - // it's possible to have a required service whose log output is not desired - // (i.e. it's not in the attach set), so watch everything and then filter - // calls to attach; this ensures that `watchContainers` blocks until all - // required containers have exited, even if their output is not being shown - attachTo := utils.NewSet[string](options.AttachTo...) - required := utils.NewSet[string](options.Services...) - toWatch := attachTo.Union(required).Elements() - - containers, err := s.getContainers(ctx, projectName, oneOffExclude, true, toWatch...) - if err != nil { - return err - } - - // N.B. this uses the parent context (instead of attachCtx) so that the watch itself can - // continue even if one of the log streams fails - return s.watchContainers(ctx, project.Name, toWatch, required.Elements(), listener, containers, - func(ctr containerType.Summary, _ time.Time) error { - svc := ctr.Labels[api.ServiceLabel] - if attachTo.Has(svc) { - return s.attachContainer(attachCtx, ctr, listener) - } - - // HACK: simulate an "attach" event - listener(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: getContainerNameWithoutProject(ctr), - ID: ctr.ID, - Service: svc, - }) - return nil - }, func(ctr containerType.Summary, _ time.Time) error { - listener(api.ContainerEvent{ - Type: api.ContainerEventAttach, - Container: "", // actual name will be set by start event - ID: ctr.ID, - Service: ctr.Labels[api.ServiceLabel], - }) - return nil - }) - }) } var containers Containers @@ -173,182 +127,3 @@ func getDependencyCondition(service types.ServiceConfig, project *types.Project) } return ServiceConditionRunningOrHealthy } - -type containerWatchFn func(ctr containerType.Summary, t time.Time) error - -// watchContainers uses engine events to capture container start/die and notify ContainerEventListener -func (s *composeService) watchContainers(ctx context.Context, //nolint:gocyclo - projectName string, services, required []string, - listener api.ContainerEventListener, containers Containers, onStart, onRecreate containerWatchFn, -) error { - if len(containers) == 0 { - return nil - } - if len(required) == 0 { - required = services - } - - unexpected := utils.NewSet[string](required...).Diff(utils.NewSet[string](services...)) - if len(unexpected) != 0 { - return fmt.Errorf(`required service(s) "%s" not present in watched service(s) "%s"`, - strings.Join(unexpected.Elements(), ", "), - strings.Join(services, ", ")) - } - - // predicate to tell if a container we receive event for should be considered or ignored - ofInterest := func(c containerType.Summary) bool { - if len(services) > 0 { - // we only watch some services - return slices.Contains(services, c.Labels[api.ServiceLabel]) - } - return true - } - - // predicate to tell if a container we receive event for should be watched until termination - isRequired := func(c containerType.Summary) bool { - if len(services) > 0 && len(required) > 0 { - // we only watch some services - return slices.Contains(required, c.Labels[api.ServiceLabel]) - } - return true - } - - var ( - expected = utils.NewSet[string]() - watched = map[string]int{} - replaced []string - ) - for _, c := range containers { - if isRequired(c) { - expected.Add(c.ID) - } - watched[c.ID] = 0 - } - - ctx, stop := context.WithCancel(ctx) - err := s.Events(ctx, projectName, api.EventsOptions{ - Services: services, - Consumer: func(event api.Event) error { - defer func() { - // after consuming each event, check to see if we're done - if len(expected) == 0 { - stop() - } - }() - inspected, err := s.apiClient().ContainerInspect(ctx, event.Container) - if err != nil { - if cerrdefs.IsNotFound(err) { - // it's possible to get "destroy" or "kill" events but not - // be able to inspect in time before they're gone from the - // API, so just remove the watch without erroring - delete(watched, event.Container) - expected.Remove(event.Container) - return nil - } - return err - } - container := containerType.Summary{ - ID: inspected.ID, - Names: []string{inspected.Name}, - Labels: inspected.Config.Labels, - } - name := getContainerNameWithoutProject(container) - service := container.Labels[api.ServiceLabel] - switch event.Status { - case "stop": - if inspected.State.Running { - // on sync+restart action the container stops -> dies -> start -> restart - // we do not want to stop the current container, we want to restart it - return nil - } - if _, ok := watched[container.ID]; ok { - eType := api.ContainerEventStopped - if slices.Contains(replaced, container.ID) { - replaced = slices.DeleteFunc(replaced, func(e string) bool { return e == container.ID }) - eType = api.ContainerEventRecreated - } - listener(api.ContainerEvent{ - Type: eType, - Container: name, - ID: container.ID, - Service: service, - ExitCode: inspected.State.ExitCode, - }) - } - - delete(watched, container.ID) - expected.Remove(container.ID) - case "die": - restarted := watched[container.ID] - watched[container.ID] = restarted + 1 - // Container terminated. - willRestart := inspected.State.Restarting - if inspected.State.Running { - // on sync+restart action inspected.State.Restarting is false, - // however the container is already running before it restarts - willRestart = true - } - - eType := api.ContainerEventExit - if slices.Contains(replaced, container.ID) { - replaced = slices.DeleteFunc(replaced, func(e string) bool { return e == container.ID }) - eType = api.ContainerEventRecreated - } - - listener(api.ContainerEvent{ - Type: eType, - Container: name, - ID: container.ID, - Service: service, - ExitCode: inspected.State.ExitCode, - Restarting: willRestart, - }) - - if !willRestart { - // we're done with this one - delete(watched, container.ID) - expected.Remove(container.ID) - } - case "start": - count, ok := watched[container.ID] - mustAttach := ok && count > 0 // Container restarted, need to re-attach - if !ok { - // A new container has just been added to service by scale - watched[container.ID] = 0 - expected.Add(container.ID) - mustAttach = true - } - if mustAttach { - // Container restarted, need to re-attach - err := onStart(container, event.Timestamp) - if err != nil { - return err - } - } - case "create": - if id, ok := container.Labels[api.ContainerReplaceLabel]; ok { - replaced = append(replaced, id) - err = onRecreate(container, event.Timestamp) - if err != nil { - return err - } - if expected.Has(id) { - expected.Add(inspected.ID) - expected.Add(container.ID) - } - watched[container.ID] = 1 - } else if ofInterest(container) { - watched[container.ID] = 1 - if isRequired(container) { - expected.Add(container.ID) - } - } - } - return nil - }, - }) - if errors.Is(ctx.Err(), context.Canceled) { - return nil - } - return err -} diff --git a/pkg/compose/up.go b/pkg/compose/up.go index dda37f68e..85fbe1ef6 100644 --- a/pkg/compose/up.go +++ b/pkg/compose/up.go @@ -159,24 +159,6 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } }) - var exitCode int - eg.Go(func() error { - code, err := printer.Run(options.Start.OnExit, options.Start.ExitCodeFrom, func() error { - _, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...") - eg.Go(func() error { - return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { - return s.stop(ctx, project.Name, api.StopOptions{ - Services: options.Create.Services, - Project: project, - }, printer.HandleEvent) - }, s.stdinfo(), logConsumer) - }) - return nil - }) - exitCode = code - return err - }) - if options.Start.Watch && watcher != nil { err = watcher.Start(ctx) if err != nil { @@ -184,17 +166,75 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } } + monitor := newMonitor(s.apiClient(), project) + monitor.withListener(printer.HandleEvent) + + var exitCode int + if options.Start.OnExit != api.CascadeIgnore { + once := true + // detect first container to exit to trigger application shutdown + monitor.withListener(func(event api.ContainerEvent) { + if once && event.Type == api.ContainerEventExited { + exitCode = event.ExitCode + printer.Stop() + _, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...") + eg.Go(func() error { + return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { + return s.stop(ctx, project.Name, api.StopOptions{ + Services: options.Create.Services, + Project: project, + }, printer.HandleEvent) + }, s.stdinfo(), logConsumer) + }) + once = false + } + }) + } + + if options.Start.ExitCodeFrom != "" { + once := true + // capture exit code from first container to exit with selected service + monitor.withListener(func(event api.ContainerEvent) { + if once && event.Type == api.ContainerEventExited && event.Service == options.Start.ExitCodeFrom { + exitCode = event.ExitCode + once = false + } + }) + } + + monitor.withListener(func(event api.ContainerEvent) { + mustAttach := false + switch event.Type { + case api.ContainerEventCreated: + // A container has been added to the application (scale) + mustAttach = true + case api.ContainerEventStarted: + // A container is restarting - need to re-attach + mustAttach = event.Restarting + } + if mustAttach { + eg.Go(func() error { + // FIXME as container already started, we might miss the very first logs + return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent) + }) + } + }) + + eg.Go(func() error { + err := monitor.Start(ctx) + fmt.Println("monitor complete") + // Signal for the signal-handler goroutines to stop + close(doneCh) + printer.Stop() + return err + }) + // We use the parent context without cancellation as we manage sigterm to stop the stack err = s.start(context.WithoutCancel(ctx), project.Name, options.Start, printer.HandleEvent) if err != nil && !isTerminated.Load() { // Ignore error if the process is terminated return err } - // Signal for the signal-handler goroutines to stop - close(doneCh) - - printer.Stop() - err = eg.Wait().ErrorOrNil() if exitCode != 0 { errMsg := "" diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index 80ed42acd..f8edf3418 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -192,7 +192,6 @@ func (s *composeService) watch(ctx context.Context, project *types.Project, opti return nil, err } eg, ctx := errgroup.WithContext(ctx) - options.LogTo.Register(api.WatchLogger) var ( rules []watchRule diff --git a/pkg/compose/watch_test.go b/pkg/compose/watch_test.go index e7492f2bc..c009fdfc0 100644 --- a/pkg/compose/watch_test.go +++ b/pkg/compose/watch_test.go @@ -71,9 +71,6 @@ func (s stdLogger) Status(containerName, msg string) { fmt.Printf("%s: %s\n", containerName, msg) } -func (s stdLogger) Register(containerName string) { -} - func TestWatch_Sync(t *testing.T) { mockCtrl := gomock.NewController(t) cli := mocks.NewMockCli(mockCtrl) diff --git a/pkg/prompt/prompt_mock.go b/pkg/prompt/prompt_mock.go index 6b7767415..83b0ff118 100644 --- a/pkg/prompt/prompt_mock.go +++ b/pkg/prompt/prompt_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Container: github.com/docker/compose-cli/pkg/prompt (interfaces: UI) +// Source: github.com/docker/compose-cli/pkg/prompt (interfaces: UI) // Package prompt is a generated GoMock package. package prompt