mirror of https://github.com/docker/compose.git
Merge pull request #1305 from docker/join_split
This commit is contained in:
commit
220b508d53
|
@ -805,7 +805,7 @@ func (s sdk) DeleteSecret(ctx context.Context, id string, recover bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (s sdk) GetLogs(ctx context.Context, name string, consumer func(service string, container string, message string), follow bool) error {
|
||||
func (s sdk) GetLogs(ctx context.Context, name string, consumer func(name string, container string, message string), follow bool) error {
|
||||
logGroup := fmt.Sprintf("/docker-compose/%s", name)
|
||||
var startTime int64
|
||||
for {
|
||||
|
|
|
@ -105,7 +105,9 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer
|
|||
for _, pod := range pods.Items {
|
||||
request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
|
||||
service := pod.Labels[compose.ServiceTag]
|
||||
w := utils.GetWriter(service, pod.Name, consumer)
|
||||
w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
|
||||
consumer.Log(event.Name, event.Source, event.Line)
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
r, err := request.Stream(ctx)
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
"github.com/docker/compose-cli/api/compose"
|
||||
convert "github.com/docker/compose-cli/local/moby"
|
||||
"github.com/docker/compose-cli/utils"
|
||||
|
||||
"github.com/compose-spec/compose-go/types"
|
||||
moby "github.com/docker/docker/api/types"
|
||||
|
@ -62,7 +63,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con
|
|||
|
||||
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.ContainerEventListener, project *types.Project) error {
|
||||
serviceName := container.Labels[serviceLabel]
|
||||
w := getWriter(getContainerNameWithoutProject(container), serviceName, container.ID, consumer)
|
||||
w := utils.GetWriter(getContainerNameWithoutProject(container), serviceName, container.ID, consumer)
|
||||
|
||||
service, err := project.GetService(serviceName)
|
||||
if err != nil {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package compose
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
|
||||
|
@ -75,7 +74,10 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w := utils.GetWriter(service, getContainerNameWithoutProject(c), consumer)
|
||||
name := getContainerNameWithoutProject(c)
|
||||
w := utils.GetWriter(name, service, c.ID, func(event compose.ContainerEvent) {
|
||||
consumer.Log(event.Service, event.Name, event.Line)
|
||||
})
|
||||
if container.Config.Tty {
|
||||
_, err = io.Copy(w, r)
|
||||
} else {
|
||||
|
@ -86,36 +88,3 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
|
|||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
type splitBuffer struct {
|
||||
name string
|
||||
container string
|
||||
consumer compose.ContainerEventListener
|
||||
service string
|
||||
}
|
||||
|
||||
// getWriter creates a io.Writer that will actually split by line and format by LogConsumer
|
||||
func getWriter(name, service, container string, events compose.ContainerEventListener) io.Writer {
|
||||
return splitBuffer{
|
||||
name: name,
|
||||
service: service,
|
||||
container: container,
|
||||
consumer: events,
|
||||
}
|
||||
}
|
||||
|
||||
func (s splitBuffer) Write(b []byte) (n int, err error) {
|
||||
split := bytes.Split(b, []byte{'\n'})
|
||||
for _, line := range split {
|
||||
if len(line) != 0 {
|
||||
s.consumer(compose.ContainerEvent{
|
||||
Type: compose.ContainerEventLog,
|
||||
Name: s.name,
|
||||
Service: s.service,
|
||||
Source: s.container,
|
||||
Line: string(line),
|
||||
})
|
||||
}
|
||||
}
|
||||
return len(b), nil
|
||||
}
|
||||
|
|
|
@ -23,15 +23,6 @@ import (
|
|||
"github.com/docker/compose-cli/api/compose"
|
||||
)
|
||||
|
||||
// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer
|
||||
func GetWriter(service, container string, l compose.LogConsumer) io.Writer {
|
||||
return splitBuffer{
|
||||
service: service,
|
||||
container: container,
|
||||
consumer: l,
|
||||
}
|
||||
}
|
||||
|
||||
// FilteredLogConsumer filters logs for given services
|
||||
func FilteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer {
|
||||
if len(services) == 0 {
|
||||
|
@ -52,36 +43,63 @@ type allowListLogConsumer struct {
|
|||
delegate compose.LogConsumer
|
||||
}
|
||||
|
||||
func (a *allowListLogConsumer) Log(service, container, message string) {
|
||||
if a.allowList[service] {
|
||||
a.delegate.Log(service, container, message)
|
||||
func (a *allowListLogConsumer) Log(name, container, message string) {
|
||||
if a.allowList[name] {
|
||||
a.delegate.Log(name, container, message)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *allowListLogConsumer) Status(service, container, message string) {
|
||||
if a.allowList[service] {
|
||||
a.delegate.Status(service, container, message)
|
||||
func (a *allowListLogConsumer) Status(name, container, message string) {
|
||||
if a.allowList[name] {
|
||||
a.delegate.Status(name, container, message)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *allowListLogConsumer) Register(service string, source string) {
|
||||
if a.allowList[service] {
|
||||
a.delegate.Register(service, source)
|
||||
func (a *allowListLogConsumer) Register(name string, source string) {
|
||||
if a.allowList[name] {
|
||||
a.delegate.Register(name, source)
|
||||
}
|
||||
}
|
||||
|
||||
// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer
|
||||
func GetWriter(name, service, container string, events compose.ContainerEventListener) io.Writer {
|
||||
return &splitBuffer{
|
||||
buffer: bytes.Buffer{},
|
||||
name: name,
|
||||
service: service,
|
||||
container: container,
|
||||
consumer: events,
|
||||
}
|
||||
}
|
||||
|
||||
type splitBuffer struct {
|
||||
buffer bytes.Buffer
|
||||
name string
|
||||
service string
|
||||
container string
|
||||
consumer compose.LogConsumer
|
||||
consumer compose.ContainerEventListener
|
||||
}
|
||||
|
||||
func (s splitBuffer) Write(b []byte) (n int, err error) {
|
||||
split := bytes.Split(b, []byte{'\n'})
|
||||
for _, line := range split {
|
||||
if len(line) != 0 {
|
||||
s.consumer.Log(s.service, s.container, string(line))
|
||||
}
|
||||
// Write implements io.Writer. joins all input, splits on the separator and yields each chunk
|
||||
func (s *splitBuffer) Write(b []byte) (int, error) {
|
||||
n, err := s.buffer.Write(b)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
return len(b), nil
|
||||
for {
|
||||
b = s.buffer.Bytes()
|
||||
index := bytes.Index(b, []byte{'\n'})
|
||||
if index < 0 {
|
||||
break
|
||||
}
|
||||
line := s.buffer.Next(index + 1)
|
||||
s.consumer(compose.ContainerEvent{
|
||||
Type: compose.ContainerEventLog,
|
||||
Name: s.name,
|
||||
Service: s.service,
|
||||
Source: s.container,
|
||||
Line: string(line[:len(line)-1]),
|
||||
})
|
||||
}
|
||||
return n, nil
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
Copyright 2020 Docker Compose CLI authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"gotest.tools/v3/assert"
|
||||
|
||||
"github.com/docker/compose-cli/api/compose"
|
||||
)
|
||||
|
||||
func TestSplitWriter(t *testing.T) {
|
||||
var lines []string
|
||||
w := GetWriter("name", "service", "container", func(event compose.ContainerEvent) {
|
||||
lines = append(lines, event.Line)
|
||||
})
|
||||
w.Write([]byte("h")) //nolint: errcheck
|
||||
w.Write([]byte("e")) //nolint: errcheck
|
||||
w.Write([]byte("l")) //nolint: errcheck
|
||||
w.Write([]byte("l")) //nolint: errcheck
|
||||
w.Write([]byte("o")) //nolint: errcheck
|
||||
w.Write([]byte("\n")) //nolint: errcheck
|
||||
w.Write([]byte("world!\n")) //nolint: errcheck
|
||||
assert.DeepEqual(t, lines, []string{"hello", "world!"})
|
||||
|
||||
}
|
Loading…
Reference in New Issue