mirror of https://github.com/docker/compose.git
Merge pull request #1331 from docker/kube_e2e_failure
pass service to LogConsumer
This commit is contained in:
commit
988813b130
|
@ -218,7 +218,7 @@ type Stack struct {
|
||||||
|
|
||||||
// LogConsumer is a callback to process log messages from services
|
// LogConsumer is a callback to process log messages from services
|
||||||
type LogConsumer interface {
|
type LogConsumer interface {
|
||||||
Log(name, container, message string)
|
Log(name, service, container, message string)
|
||||||
Status(name, container, msg string)
|
Status(name, container, msg string)
|
||||||
Register(name string, source string)
|
Register(name string, source string)
|
||||||
}
|
}
|
||||||
|
|
|
@ -347,7 +347,7 @@ func (p printer) run(ctx context.Context, cascadeStop bool, exitCodeFrom string,
|
||||||
}
|
}
|
||||||
case compose.ContainerEventLog:
|
case compose.ContainerEventLog:
|
||||||
if !aborting {
|
if !aborting {
|
||||||
consumer.Log(event.Name, event.Source, event.Line)
|
consumer.Log(event.Name, event.Service, event.Source, event.Line)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,13 +62,13 @@ func (l *logConsumer) register(name string, id string) *presenter {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log formats a log message as received from name/container
|
// Log formats a log message as received from name/container
|
||||||
func (l *logConsumer) Log(name, id, message string) {
|
func (l *logConsumer) Log(name, service, container, message string) {
|
||||||
if l.ctx.Err() != nil {
|
if l.ctx.Err() != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p, ok := l.presenters[id]
|
p, ok := l.presenters[container]
|
||||||
if !ok { // should have been registered, but ¯\_(ツ)_/¯
|
if !ok { // should have been registered, but ¯\_(ツ)_/¯
|
||||||
p = l.register(name, id)
|
p = l.register(name, container)
|
||||||
}
|
}
|
||||||
for _, line := range strings.Split(message, "\n") {
|
for _, line := range strings.Split(message, "\n") {
|
||||||
fmt.Fprintf(l.writer, "%s %s\n", p.prefix, line) // nolint:errcheck
|
fmt.Fprintf(l.writer, "%s %s\n", p.prefix, line) // nolint:errcheck
|
||||||
|
|
|
@ -63,7 +63,7 @@ type API interface {
|
||||||
InspectSecret(ctx context.Context, id string) (secrets.Secret, error)
|
InspectSecret(ctx context.Context, id string) (secrets.Secret, error)
|
||||||
ListSecrets(ctx context.Context) ([]secrets.Secret, error)
|
ListSecrets(ctx context.Context) ([]secrets.Secret, error)
|
||||||
DeleteSecret(ctx context.Context, id string, recover bool) error
|
DeleteSecret(ctx context.Context, id string, recover bool) error
|
||||||
GetLogs(ctx context.Context, name string, consumer func(service string, container string, message string), follow bool) error
|
GetLogs(ctx context.Context, name string, consumer func(name string, service string, container string, message string), follow bool) error
|
||||||
DescribeService(ctx context.Context, cluster string, arn string) (compose.ServiceStatus, error)
|
DescribeService(ctx context.Context, cluster string, arn string) (compose.ServiceStatus, error)
|
||||||
DescribeServiceTasks(ctx context.Context, cluster string, project string, service string) ([]compose.ContainerSummary, error)
|
DescribeServiceTasks(ctx context.Context, cluster string, project string, service string) ([]compose.ContainerSummary, error)
|
||||||
getURLWithPortMapping(ctx context.Context, targetGroupArns []string) ([]compose.PortPublisher, error)
|
getURLWithPortMapping(ctx context.Context, targetGroupArns []string) ([]compose.PortPublisher, error)
|
||||||
|
|
|
@ -285,7 +285,7 @@ func (mr *MockAPIMockRecorder) GetLoadBalancerURL(arg0, arg1 interface{}) *gomoc
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLogs mocks base method
|
// GetLogs mocks base method
|
||||||
func (m *MockAPI) GetLogs(arg0 context.Context, arg1 string, arg2 func(string, string, string), arg3 bool) error {
|
func (m *MockAPI) GetLogs(arg0 context.Context, arg1 string, arg2 func(string, string, string, string), arg3 bool) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "GetLogs", arg0, arg1, arg2, arg3)
|
ret := m.ctrl.Call(m, "GetLogs", arg0, arg1, arg2, arg3)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
|
|
|
@ -805,7 +805,7 @@ func (s sdk) DeleteSecret(ctx context.Context, id string, recover bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s sdk) GetLogs(ctx context.Context, name string, consumer func(name string, container string, message string), follow bool) error {
|
func (s sdk) GetLogs(ctx context.Context, name string, consumer func(name string, service string, container string, message string), follow bool) error {
|
||||||
logGroup := fmt.Sprintf("/docker-compose/%s", name)
|
logGroup := fmt.Sprintf("/docker-compose/%s", name)
|
||||||
var startTime int64
|
var startTime int64
|
||||||
for {
|
for {
|
||||||
|
@ -832,7 +832,7 @@ func (s sdk) GetLogs(ctx context.Context, name string, consumer func(name string
|
||||||
|
|
||||||
for _, event := range events.Events {
|
for _, event := range events.Events {
|
||||||
p := strings.Split(aws.StringValue(event.LogStreamName), "/")
|
p := strings.Split(aws.StringValue(event.LogStreamName), "/")
|
||||||
consumer(p[1], p[2], aws.StringValue(event.Message))
|
consumer(p[1], p[1], p[2], aws.StringValue(event.Message))
|
||||||
startTime = *event.IngestionTime
|
startTime = *event.IngestionTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer
|
||||||
request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
|
request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
|
||||||
service := pod.Labels[compose.ServiceTag]
|
service := pod.Labels[compose.ServiceTag]
|
||||||
w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
|
w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
|
||||||
consumer.Log(event.Name, event.Source, event.Line)
|
consumer.Log(event.Name, event.Service, event.Source, event.Line)
|
||||||
})
|
})
|
||||||
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
|
|
|
@ -77,7 +77,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
|
||||||
}
|
}
|
||||||
name := getContainerNameWithoutProject(c)
|
name := getContainerNameWithoutProject(c)
|
||||||
w := utils.GetWriter(name, service, c.ID, func(event compose.ContainerEvent) {
|
w := utils.GetWriter(name, service, c.ID, func(event compose.ContainerEvent) {
|
||||||
consumer.Log(name, event.Name, event.Line)
|
consumer.Log(name, event.Service, event.Name, event.Line)
|
||||||
})
|
})
|
||||||
if container.Config.Tty {
|
if container.Config.Tty {
|
||||||
_, err = io.Copy(w, r)
|
_, err = io.Copy(w, r)
|
||||||
|
|
|
@ -43,9 +43,9 @@ type allowListLogConsumer struct {
|
||||||
delegate compose.LogConsumer
|
delegate compose.LogConsumer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *allowListLogConsumer) Log(name, container, message string) {
|
func (a *allowListLogConsumer) Log(name, service, container, message string) {
|
||||||
if a.allowList[name] {
|
if a.allowList[service] {
|
||||||
a.delegate.Log(name, container, message)
|
a.delegate.Log(name, service, container, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue