reconcide log's wplit_writer implementations

close #1311

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2021-02-16 10:43:35 +01:00
parent 9097d71009
commit 12a5100b20
6 changed files with 56 additions and 76 deletions

View File

@ -805,7 +805,7 @@ func (s sdk) DeleteSecret(ctx context.Context, id string, recover bool) error {
return err return err
} }
func (s sdk) GetLogs(ctx context.Context, name string, consumer func(service string, container string, message string), follow bool) error { func (s sdk) GetLogs(ctx context.Context, name string, consumer func(name string, container string, message string), follow bool) error {
logGroup := fmt.Sprintf("/docker-compose/%s", name) logGroup := fmt.Sprintf("/docker-compose/%s", name)
var startTime int64 var startTime int64
for { for {

View File

@ -105,7 +105,9 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer
for _, pod := range pods.Items { for _, pod := range pods.Items {
request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow}) request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
service := pod.Labels[compose.ServiceTag] service := pod.Labels[compose.ServiceTag]
w := utils.GetWriter(service, pod.Name, consumer) w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
consumer.Log(event.Name, event.Source, event.Line)
})
eg.Go(func() error { eg.Go(func() error {
r, err := request.Stream(ctx) r, err := request.Stream(ctx)

View File

@ -24,6 +24,7 @@ import (
"github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/api/compose"
convert "github.com/docker/compose-cli/local/moby" convert "github.com/docker/compose-cli/local/moby"
"github.com/docker/compose-cli/utils"
"github.com/compose-spec/compose-go/types" "github.com/compose-spec/compose-go/types"
moby "github.com/docker/docker/api/types" moby "github.com/docker/docker/api/types"
@ -62,7 +63,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.ContainerEventListener, project *types.Project) error { func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.ContainerEventListener, project *types.Project) error {
serviceName := container.Labels[serviceLabel] serviceName := container.Labels[serviceLabel]
w := getWriter(getContainerNameWithoutProject(container), serviceName, container.ID, consumer) w := utils.GetWriter(getContainerNameWithoutProject(container), serviceName, container.ID, consumer)
service, err := project.GetService(serviceName) service, err := project.GetService(serviceName)
if err != nil { if err != nil {

View File

@ -17,7 +17,6 @@
package compose package compose
import ( import (
"bytes"
"context" "context"
"io" "io"
@ -75,7 +74,10 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
if err != nil { if err != nil {
return err return err
} }
w := utils.GetWriter(service, getContainerNameWithoutProject(c), consumer) name := getContainerNameWithoutProject(c)
w := utils.GetWriter(name, service, c.ID, func(event compose.ContainerEvent) {
consumer.Log(event.Service, event.Name, event.Line)
})
if container.Config.Tty { if container.Config.Tty {
_, err = io.Copy(w, r) _, err = io.Copy(w, r)
} else { } else {
@ -86,46 +88,3 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
} }
return eg.Wait() return eg.Wait()
} }
type splitBuffer struct {
buffer bytes.Buffer
name string
service string
container string
consumer compose.ContainerEventListener
}
// getWriter creates a io.Writer that will actually split by line and format by LogConsumer
func getWriter(name, service, container string, events compose.ContainerEventListener) io.Writer {
return &splitBuffer{
buffer: bytes.Buffer{},
name: name,
service: service,
container: container,
consumer: events,
}
}
// Write implements io.Writer. joins all input, splits on the separator and yields each chunk
func (s *splitBuffer) Write(b []byte) (int, error) {
n, err := s.buffer.Write(b)
if err != nil {
return n, err
}
for {
b = s.buffer.Bytes()
index := bytes.Index(b, []byte{'\n'})
if index < 0 {
break
}
line := s.buffer.Next(index + 1)
s.consumer(compose.ContainerEvent{
Type: compose.ContainerEventLog,
Name: s.name,
Service: s.service,
Source: s.container,
Line: string(line[:len(line)-1]),
})
}
return n, nil
}

View File

@ -23,15 +23,6 @@ import (
"github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/api/compose"
) )
// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer
func GetWriter(service, container string, l compose.LogConsumer) io.Writer {
return splitBuffer{
service: service,
container: container,
consumer: l,
}
}
// FilteredLogConsumer filters logs for given services // FilteredLogConsumer filters logs for given services
func FilteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer { func FilteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer {
if len(services) == 0 { if len(services) == 0 {
@ -52,36 +43,63 @@ type allowListLogConsumer struct {
delegate compose.LogConsumer delegate compose.LogConsumer
} }
func (a *allowListLogConsumer) Log(service, container, message string) { func (a *allowListLogConsumer) Log(name, container, message string) {
if a.allowList[service] { if a.allowList[name] {
a.delegate.Log(service, container, message) a.delegate.Log(name, container, message)
} }
} }
func (a *allowListLogConsumer) Status(service, container, message string) { func (a *allowListLogConsumer) Status(name, container, message string) {
if a.allowList[service] { if a.allowList[name] {
a.delegate.Status(service, container, message) a.delegate.Status(name, container, message)
} }
} }
func (a *allowListLogConsumer) Register(service string, source string) { func (a *allowListLogConsumer) Register(name string, source string) {
if a.allowList[service] { if a.allowList[name] {
a.delegate.Register(service, source) a.delegate.Register(name, source)
}
}
// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer
func GetWriter(name, service, container string, events compose.ContainerEventListener) io.Writer {
return &splitBuffer{
buffer: bytes.Buffer{},
name: name,
service: service,
container: container,
consumer: events,
} }
} }
type splitBuffer struct { type splitBuffer struct {
buffer bytes.Buffer
name string
service string service string
container string container string
consumer compose.LogConsumer consumer compose.ContainerEventListener
} }
func (s splitBuffer) Write(b []byte) (n int, err error) { // Write implements io.Writer. joins all input, splits on the separator and yields each chunk
split := bytes.Split(b, []byte{'\n'}) func (s *splitBuffer) Write(b []byte) (int, error) {
for _, line := range split { n, err := s.buffer.Write(b)
if len(line) != 0 { if err != nil {
s.consumer.Log(s.service, s.container, string(line)) return n, err
}
} }
return len(b), nil for {
b = s.buffer.Bytes()
index := bytes.Index(b, []byte{'\n'})
if index < 0 {
break
}
line := s.buffer.Next(index + 1)
s.consumer(compose.ContainerEvent{
Type: compose.ContainerEventLog,
Name: s.name,
Service: s.service,
Source: s.container,
Line: string(line[:len(line)-1]),
})
}
return n, nil
} }

View File

@ -14,7 +14,7 @@
limitations under the License. limitations under the License.
*/ */
package compose package utils
import ( import (
"testing" "testing"
@ -26,7 +26,7 @@ import (
func TestSplitWriter(t *testing.T) { func TestSplitWriter(t *testing.T) {
var lines []string var lines []string
w := getWriter("service", "container", func(event compose.ContainerEvent) { w := GetWriter("name", "service", "container", func(event compose.ContainerEvent) {
lines = append(lines, event.Line) lines = append(lines, event.Line)
}) })
w.Write([]byte("h")) //nolint: errcheck w.Write([]byte("h")) //nolint: errcheck