Detect new containers on logs --follow

Signed-off-by: Ulysses Souza <ulyssessouza@gmail.com>
This commit is contained in:
Ulysses Souza 2021-06-28 09:56:17 -03:00
parent 396d449d36
commit 287f3156ae
4 changed files with 76 additions and 62 deletions

View File

@ -101,7 +101,7 @@ func (s *composeService) up(ctx context.Context, project *types.Project) error {
})
} else {
//update stack
// update stack
eventName = "Updating Compose stack"
w.Event(progress.CreatingEvent(eventName))

View File

@ -96,7 +96,7 @@ func (s *ServiceProxy) WithInterceptor(interceptors ...Interceptor) *ServiceProx
return s
}
//Build implements Service interface
// Build implements Service interface
func (s *ServiceProxy) Build(ctx context.Context, project *types.Project, options BuildOptions) error {
if s.BuildFn == nil {
return ErrNotImplemented
@ -107,7 +107,7 @@ func (s *ServiceProxy) Build(ctx context.Context, project *types.Project, option
return s.BuildFn(ctx, project, options)
}
//Push implements Service interface
// Push implements Service interface
func (s *ServiceProxy) Push(ctx context.Context, project *types.Project, options PushOptions) error {
if s.PushFn == nil {
return ErrNotImplemented
@ -118,7 +118,7 @@ func (s *ServiceProxy) Push(ctx context.Context, project *types.Project, options
return s.PushFn(ctx, project, options)
}
//Pull implements Service interface
// Pull implements Service interface
func (s *ServiceProxy) Pull(ctx context.Context, project *types.Project, options PullOptions) error {
if s.PullFn == nil {
return ErrNotImplemented
@ -129,7 +129,7 @@ func (s *ServiceProxy) Pull(ctx context.Context, project *types.Project, options
return s.PullFn(ctx, project, options)
}
//Create implements Service interface
// Create implements Service interface
func (s *ServiceProxy) Create(ctx context.Context, project *types.Project, options CreateOptions) error {
if s.CreateFn == nil {
return ErrNotImplemented
@ -140,7 +140,7 @@ func (s *ServiceProxy) Create(ctx context.Context, project *types.Project, optio
return s.CreateFn(ctx, project, options)
}
//Start implements Service interface
// Start implements Service interface
func (s *ServiceProxy) Start(ctx context.Context, project *types.Project, options StartOptions) error {
if s.StartFn == nil {
return ErrNotImplemented
@ -151,7 +151,7 @@ func (s *ServiceProxy) Start(ctx context.Context, project *types.Project, option
return s.StartFn(ctx, project, options)
}
//Restart implements Service interface
// Restart implements Service interface
func (s *ServiceProxy) Restart(ctx context.Context, project *types.Project, options RestartOptions) error {
if s.RestartFn == nil {
return ErrNotImplemented
@ -162,7 +162,7 @@ func (s *ServiceProxy) Restart(ctx context.Context, project *types.Project, opti
return s.RestartFn(ctx, project, options)
}
//Stop implements Service interface
// Stop implements Service interface
func (s *ServiceProxy) Stop(ctx context.Context, project *types.Project, options StopOptions) error {
if s.StopFn == nil {
return ErrNotImplemented
@ -173,7 +173,7 @@ func (s *ServiceProxy) Stop(ctx context.Context, project *types.Project, options
return s.StopFn(ctx, project, options)
}
//Up implements Service interface
// Up implements Service interface
func (s *ServiceProxy) Up(ctx context.Context, project *types.Project, options UpOptions) error {
if s.UpFn == nil {
return ErrNotImplemented
@ -184,7 +184,7 @@ func (s *ServiceProxy) Up(ctx context.Context, project *types.Project, options U
return s.UpFn(ctx, project, options)
}
//Down implements Service interface
// Down implements Service interface
func (s *ServiceProxy) Down(ctx context.Context, project string, options DownOptions) error {
if s.DownFn == nil {
return ErrNotImplemented
@ -192,15 +192,15 @@ func (s *ServiceProxy) Down(ctx context.Context, project string, options DownOpt
return s.DownFn(ctx, project, options)
}
//Logs implements Service interface
func (s *ServiceProxy) Logs(ctx context.Context, project string, consumer LogConsumer, options LogOptions) error {
// Logs implements Service interface
func (s *ServiceProxy) Logs(ctx context.Context, projectName string, consumer LogConsumer, options LogOptions) error {
if s.LogsFn == nil {
return ErrNotImplemented
}
return s.LogsFn(ctx, project, consumer, options)
return s.LogsFn(ctx, projectName, consumer, options)
}
//Ps implements Service interface
// Ps implements Service interface
func (s *ServiceProxy) Ps(ctx context.Context, project string, options PsOptions) ([]ContainerSummary, error) {
if s.PsFn == nil {
return nil, ErrNotImplemented
@ -208,7 +208,7 @@ func (s *ServiceProxy) Ps(ctx context.Context, project string, options PsOptions
return s.PsFn(ctx, project, options)
}
//List implements Service interface
// List implements Service interface
func (s *ServiceProxy) List(ctx context.Context, options ListOptions) ([]Stack, error) {
if s.ListFn == nil {
return nil, ErrNotImplemented
@ -216,7 +216,7 @@ func (s *ServiceProxy) List(ctx context.Context, options ListOptions) ([]Stack,
return s.ListFn(ctx, options)
}
//Convert implements Service interface
// Convert implements Service interface
func (s *ServiceProxy) Convert(ctx context.Context, project *types.Project, options ConvertOptions) ([]byte, error) {
if s.ConvertFn == nil {
return nil, ErrNotImplemented
@ -227,7 +227,7 @@ func (s *ServiceProxy) Convert(ctx context.Context, project *types.Project, opti
return s.ConvertFn(ctx, project, options)
}
//Kill implements Service interface
// Kill implements Service interface
func (s *ServiceProxy) Kill(ctx context.Context, project *types.Project, options KillOptions) error {
if s.KillFn == nil {
return ErrNotImplemented
@ -238,7 +238,7 @@ func (s *ServiceProxy) Kill(ctx context.Context, project *types.Project, options
return s.KillFn(ctx, project, options)
}
//RunOneOffContainer implements Service interface
// RunOneOffContainer implements Service interface
func (s *ServiceProxy) RunOneOffContainer(ctx context.Context, project *types.Project, options RunOptions) (int, error) {
if s.RunOneOffContainerFn == nil {
return 0, ErrNotImplemented
@ -249,7 +249,7 @@ func (s *ServiceProxy) RunOneOffContainer(ctx context.Context, project *types.Pr
return s.RunOneOffContainerFn(ctx, project, options)
}
//Remove implements Service interface
// Remove implements Service interface
func (s *ServiceProxy) Remove(ctx context.Context, project *types.Project, options RemoveOptions) error {
if s.RemoveFn == nil {
return ErrNotImplemented
@ -260,7 +260,7 @@ func (s *ServiceProxy) Remove(ctx context.Context, project *types.Project, optio
return s.RemoveFn(ctx, project, options)
}
//Exec implements Service interface
// Exec implements Service interface
func (s *ServiceProxy) Exec(ctx context.Context, project *types.Project, options RunOptions) (int, error) {
if s.ExecFn == nil {
return 0, ErrNotImplemented
@ -271,7 +271,7 @@ func (s *ServiceProxy) Exec(ctx context.Context, project *types.Project, options
return s.ExecFn(ctx, project, options)
}
//Copy implements Service interface
// Copy implements Service interface
func (s *ServiceProxy) Copy(ctx context.Context, project *types.Project, options CopyOptions) error {
if s.CopyFn == nil {
return ErrNotImplemented
@ -282,7 +282,7 @@ func (s *ServiceProxy) Copy(ctx context.Context, project *types.Project, options
return s.CopyFn(ctx, project, options)
}
//Pause implements Service interface
// Pause implements Service interface
func (s *ServiceProxy) Pause(ctx context.Context, project string, options PauseOptions) error {
if s.PauseFn == nil {
return ErrNotImplemented
@ -290,7 +290,7 @@ func (s *ServiceProxy) Pause(ctx context.Context, project string, options PauseO
return s.PauseFn(ctx, project, options)
}
//UnPause implements Service interface
// UnPause implements Service interface
func (s *ServiceProxy) UnPause(ctx context.Context, project string, options PauseOptions) error {
if s.UnPauseFn == nil {
return ErrNotImplemented
@ -298,7 +298,7 @@ func (s *ServiceProxy) UnPause(ctx context.Context, project string, options Paus
return s.UnPauseFn(ctx, project, options)
}
//Top implements Service interface
// Top implements Service interface
func (s *ServiceProxy) Top(ctx context.Context, project string, services []string) ([]ContainerProcSummary, error) {
if s.TopFn == nil {
return nil, ErrNotImplemented
@ -306,7 +306,7 @@ func (s *ServiceProxy) Top(ctx context.Context, project string, services []strin
return s.TopFn(ctx, project, services)
}
//Events implements Service interface
// Events implements Service interface
func (s *ServiceProxy) Events(ctx context.Context, project string, options EventsOptions) error {
if s.EventsFn == nil {
return ErrNotImplemented
@ -314,7 +314,7 @@ func (s *ServiceProxy) Events(ctx context.Context, project string, options Event
return s.EventsFn(ctx, project, options)
}
//Port implements Service interface
// Port implements Service interface
func (s *ServiceProxy) Port(ctx context.Context, project string, service string, port int, options PortOptions) (string, int, error) {
if s.PortFn == nil {
return "", 0, ErrNotImplemented
@ -322,7 +322,7 @@ func (s *ServiceProxy) Port(ctx context.Context, project string, service string,
return s.PortFn(ctx, project, service, port, options)
}
//Images implements Service interface
// Images implements Service interface
func (s *ServiceProxy) Images(ctx context.Context, project string, options ImagesOptions) ([]ImageSummary, error) {
if s.ImagesFn == nil {
return nil, ErrNotImplemented

View File

@ -20,54 +20,68 @@ import (
"context"
"io"
"github.com/docker/compose-cli/pkg/api"
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/stdcopy"
"golang.org/x/sync/errgroup"
"github.com/docker/compose-cli/pkg/api"
"github.com/docker/compose-cli/pkg/utils"
"github.com/docker/docker/api/types"
)
func (s *composeService) Logs(ctx context.Context, projectName string, consumer api.LogConsumer, options api.LogOptions) error {
containers, err := s.getContainers(ctx, projectName, oneOffExclude, true, options.Services...)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
for _, c := range containers {
service := c.Labels[api.ServiceLabel]
container, err := s.apiClient.ContainerInspect(ctx, c.ID)
if err != nil {
return err
}
name := getContainerNameWithoutProject(c)
if options.Follow {
eg.Go(func() error {
r, err := s.apiClient.ContainerLogs(ctx, container.ID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: options.Follow,
Since: options.Since,
Until: options.Until,
Tail: options.Tail,
Timestamps: options.Timestamps,
printer := newLogPrinter(consumer)
return s.watchContainers(projectName, options.Services, printer.HandleEvent, containers, func(c types.Container) error {
return s.logContainers(ctx, consumer, c, options)
})
if err != nil {
return err
}
defer r.Close() // nolint errcheck
})
}
w := utils.GetWriter(func(line string) {
consumer.Log(name, service, line)
})
if container.Config.Tty {
_, err = io.Copy(w, r)
} else {
_, err = stdcopy.StdCopy(w, w, r)
}
return err
for _, c := range containers {
c := c
eg.Go(func() error {
return s.logContainers(ctx, consumer, c, options)
})
}
return eg.Wait()
}
func (s *composeService) logContainers(ctx context.Context, consumer api.LogConsumer, c types.Container, options api.LogOptions) error {
cnt, err := s.apiClient.ContainerInspect(ctx, c.ID)
if err != nil {
return err
}
service := c.Labels[api.ServiceLabel]
r, err := s.apiClient.ContainerLogs(ctx, cnt.ID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: options.Follow,
Since: options.Since,
Until: options.Until,
Tail: options.Tail,
Timestamps: options.Timestamps,
})
if err != nil {
return err
}
defer r.Close() // nolint errcheck
name := getContainerNameWithoutProject(c)
w := utils.GetWriter(func(line string) {
consumer.Log(name, service, line)
})
if cnt.Config.Tty {
_, err = io.Copy(w, r)
} else {
_, err = stdcopy.StdCopy(w, w, r)
}
return err
}

View File

@ -47,7 +47,7 @@ func (s *composeService) start(ctx context.Context, project *types.Project, opti
}
eg.Go(func() error {
return s.watchContainers(project, options.AttachTo, listener, attached, func(container moby.Container) error {
return s.watchContainers(project.Name, options.AttachTo, listener, attached, func(container moby.Container) error {
return s.attachContainer(ctx, container, listener, project)
})
})
@ -69,14 +69,14 @@ func (s *composeService) start(ctx context.Context, project *types.Project, opti
type containerWatchFn func(container moby.Container) error
// watchContainers uses engine events to capture container start/die and notify ContainerEventListener
func (s *composeService) watchContainers(project *types.Project, services []string, listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
func (s *composeService) watchContainers(projectName string, services []string, listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
watched := map[string]int{}
for _, c := range containers {
watched[c.ID] = 0
}
ctx, stop := context.WithCancel(context.Background())
err := s.Events(ctx, project.Name, api.EventsOptions{
err := s.Events(ctx, projectName, api.EventsOptions{
Services: services,
Consumer: func(event api.Event) error {
inspected, err := s.apiClient.ContainerInspect(ctx, event.Container)