Merge pull request #305 from docker/feat-stream-aci-logs

Follow logs on ACI.
This commit is contained in:
Djordje Lukic 2020-06-30 17:49:00 +02:00 committed by GitHub
commit 614a62b388
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 92 additions and 35 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
@ -29,6 +30,7 @@ import (
tm "github.com/buger/goterm"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/morikuni/aec"
"github.com/pkg/errors"
"github.com/docker/api/azure/login"
@ -234,6 +236,39 @@ func getACIContainerLogs(ctx context.Context, aciContext store.AciContext, conta
return *logs.Content, err
}
func streamLogs(ctx context.Context, aciContext store.AciContext, containerGroupName, containerName string, out io.Writer) error {
lastOutput := 0
for {
select {
case <-ctx.Done():
return nil
default:
logs, err := getACIContainerLogs(ctx, aciContext, containerGroupName, containerName, nil)
if err != nil {
return err
}
logLines := strings.Split(logs, "\n")
currentOutput := len(logLines)
// Note: a backend should not do this normally, this breaks the log
// streaming over gRPC but this is the only thing we can do with
// the kind of logs ACI is giving us. Hopefully Azue will give us
// a real logs streaming api soon.
b := aec.EmptyBuilder
b = b.Up(uint(lastOutput))
fmt.Fprint(out, b.Column(0).ANSI)
for i := 0; i < currentOutput-1; i++ {
fmt.Fprintln(out, logLines[i])
}
lastOutput = currentOutput - 1
time.Sleep(2 * time.Second)
}
}
}
func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {
containerGroupsClient := containerinstance.NewContainerGroupsClient(subscriptionID)
err := setupClient(&containerGroupsClient.Client)

View File

@ -24,12 +24,8 @@ import (
"strconv"
"strings"
"github.com/Azure/go-autorest/autorest/to"
"github.com/docker/api/context/cloud"
"github.com/docker/api/errdefs"
"github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
"github.com/Azure/go-autorest/autorest/to"
"github.com/compose-spec/compose-go/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -40,7 +36,9 @@ import (
"github.com/docker/api/compose"
"github.com/docker/api/containers"
apicontext "github.com/docker/api/context"
"github.com/docker/api/context/cloud"
"github.com/docker/api/context/store"
"github.com/docker/api/errdefs"
)
const singleContainerName = "single--container--aci"
@ -238,6 +236,10 @@ func (cs *aciContainerService) Logs(ctx context.Context, containerName string, r
groupName, containerAciName := getGroupAndContainerName(containerName)
var tail *int32
if req.Follow {
return streamLogs(ctx, cs.ctx, groupName, containerAciName, req.Writer)
}
if req.Tail != "all" {
reqTail, err := strconv.Atoi(req.Tail)
if err != nil {

View File

@ -102,11 +102,11 @@ func (login AzureLoginService) TestLoginFromServicePrincipal(clientID string, cl
spToken, err := creds.ServicePrincipalToken()
if err != nil {
return errors.Wrapf(errdefs.ErrLoginFailed, "could not login with service principal: %s", err)
return errors.Wrapf(errdefs.ErrLoginFailed, "could not login with service principal: %s", err)
}
err = spToken.Refresh()
if err != nil {
return errors.Wrapf(errdefs.ErrLoginFailed, "could not login with service principal: %s", err)
return errors.Wrapf(errdefs.ErrLoginFailed, "could not login with service principal: %s", err)
}
token, err := spToOAuthToken(spToken.Token())
if err != nil {

View File

@ -23,9 +23,7 @@ import (
"os"
"strings"
"testing"
"github.com/docker/api/azure"
"github.com/docker/api/azure/login"
"time"
"github.com/Azure/azure-sdk-for-go/profiles/2019-03-01/resources/mgmt/resources"
azure_storage "github.com/Azure/azure-sdk-for-go/profiles/2019-03-01/storage/mgmt/storage"
@ -35,6 +33,8 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"github.com/docker/api/azure"
"github.com/docker/api/azure/login"
"github.com/docker/api/context/store"
"github.com/docker/api/tests/aci-e2e/storage"
. "github.com/docker/api/tests/framework"
@ -57,7 +57,7 @@ type E2eACISuite struct {
}
func (s *E2eACISuite) TestContextDefault() {
It("should be initialized with default context", func() {
s.T().Run("should be initialized with default context", func(t *testing.T) {
_, err := s.NewCommand("docker", "context", "rm", "-f", contextName).Exec()
if err == nil {
log.Println("Cleaning existing test context")
@ -71,7 +71,7 @@ func (s *E2eACISuite) TestContextDefault() {
}
func (s *E2eACISuite) TestACIBackend() {
It("Logs in azure using service principal credentials", func() {
s.T().Run("Logs in azure using service principal credentials", func(t *testing.T) {
login, err := login.NewAzureLoginService()
Expect(err).To(BeNil())
// in order to create new service principal and get these 3 values : `az ad sp create-for-rbac --name 'TestServicePrincipal' --sdk-auth`
@ -82,7 +82,7 @@ func (s *E2eACISuite) TestACIBackend() {
Expect(err).To(BeNil())
})
It("creates a new aci context for tests", func() {
s.T().Run("creates a new aci context for tests", func(t *testing.T) {
setupTestResourceGroup(resourceGroupName)
helper := azure.NewACIResourceGroupHelper()
models, err := helper.GetSubscriptionIDs(context.TODO())
@ -90,12 +90,11 @@ func (s *E2eACISuite) TestACIBackend() {
subscriptionID = *models[0].SubscriptionID
s.NewDockerCommand("context", "create", "aci", contextName, "--subscription-id", subscriptionID, "--resource-group", resourceGroupName, "--location", location).ExecOrDie()
// Expect(output).To(ContainSubstring("ACI context acitest created"))
})
defer deleteResourceGroup(resourceGroupName)
It("uses the aci context", func() {
s.T().Run("uses the aci context", func(t *testing.T) {
currentContext := s.NewCommand("docker", "context", "use", contextName).ExecOrDie()
Expect(currentContext).To(ContainSubstring(contextName))
output := s.NewCommand("docker", "context", "ls").ExecOrDie()
@ -107,7 +106,8 @@ func (s *E2eACISuite) TestACIBackend() {
Expect(len(Lines(output))).To(Equal(1))
})
It("runs nginx on port 80", func() {
var nginxExposedURL string
s.T().Run("runs nginx on port 80", func(t *testing.T) {
aciContext := store.AciContext{
SubscriptionID: subscriptionID,
Location: location,
@ -138,15 +138,34 @@ func (s *E2eACISuite) TestACIBackend() {
containerID := containerFields[0]
Expect(exposedIP).To(ContainSubstring(":80->80/tcp"))
publishedURL := strings.ReplaceAll(exposedIP, "->80/tcp", "")
output = s.NewCommand("curl", publishedURL).ExecOrDie()
nginxExposedURL = strings.ReplaceAll(exposedIP, "->80/tcp", "")
output = s.NewCommand("curl", nginxExposedURL).ExecOrDie()
Expect(output).To(ContainSubstring(testFileContent))
output = s.NewDockerCommand("logs", containerID).ExecOrDie()
Expect(output).To(ContainSubstring("GET"))
})
It("removes container nginx", func() {
s.T().Run("follow logs from nginx", func(t *testing.T) {
ctx := s.NewDockerCommand("logs", "--follow", testContainerName).WithTimeout(time.NewTimer(5 * time.Second).C)
outChan := make(chan string)
go func() {
output, _ := ctx.Exec()
outChan <- output
}()
// Give the `logs --follow` a little time to get the first burst of logs
time.Sleep(1 * time.Second)
s.NewCommand("curl", nginxExposedURL+"/test").ExecOrDie()
output := <-outChan
Expect(output).To(ContainSubstring("/test"))
})
s.T().Run("removes container nginx", func(t *testing.T) {
output := s.NewDockerCommand("rm", testContainerName).ExecOrDie()
Expect(Lines(output)[0]).To(Equal(testContainerName))
})
@ -156,9 +175,9 @@ func (s *E2eACISuite) TestACIBackend() {
const composeFileMultiplePorts = "../composefiles/aci-demo/aci_demo_multi_port.yaml"
const serverContainer = "acidemo_web"
const wordsContainer = "acidemo_words"
It("deploys a compose app", func() {
s.T().Run("deploys a compose app", func(t *testing.T) {
s.NewDockerCommand("compose", "up", "-f", composeFile, "--project-name", "acidemo").ExecOrDie()
// Expect(output).To(ContainSubstring("Successfully deployed"))
output := s.NewDockerCommand("ps").ExecOrDie()
Lines := Lines(output)
Expect(len(Lines)).To(Equal(4))
@ -183,12 +202,12 @@ func (s *E2eACISuite) TestACIBackend() {
Expect(webChecked).To(BeTrue())
})
It("get logs from web service", func() {
s.T().Run("get logs from web service", func(t *testing.T) {
output := s.NewDockerCommand("logs", serverContainer).ExecOrDie()
Expect(output).To(ContainSubstring("Listening on port 80"))
})
It("updates a compose app", func() {
s.T().Run("updates a compose app", func(t *testing.T) {
s.NewDockerCommand("compose", "up", "-f", composeFileMultiplePorts, "--project-name", "acidemo").ExecOrDie()
// Expect(output).To(ContainSubstring("Successfully deployed"))
output := s.NewDockerCommand("ps").ExecOrDie()
@ -224,15 +243,16 @@ func (s *E2eACISuite) TestACIBackend() {
Expect(wordsChecked).To(BeTrue())
})
It("shutdown compose app", func() {
s.T().Run("shutdown compose app", func(t *testing.T) {
s.NewDockerCommand("compose", "down", "-f", composeFile, "--project-name", "acidemo").ExecOrDie()
})
It("switches back to default context", func() {
s.T().Run("switches back to default context", func(t *testing.T) {
output := s.NewCommand("docker", "context", "use", "default").ExecOrDie()
Expect(output).To(ContainSubstring("default"))
})
It("deletes test context", func() {
s.T().Run("deletes test context", func(t *testing.T) {
output := s.NewCommand("docker", "context", "rm", contextName).ExecOrDie()
Expect(output).To(ContainSubstring(contextName))
})

View File

@ -27,7 +27,7 @@ import (
"time"
"github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)
func (b CmdContext) makeCmd() *exec.Cmd {
@ -97,7 +97,7 @@ func (b CmdContext) WithStdinReader(reader io.Reader) *CmdContext {
// ExecOrDie runs a docker command.
func (b CmdContext) ExecOrDie() string {
str, err := b.Exec()
log.Debugf("stdout: %s", str)
logrus.Debugf("stdout: %s", str)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return str
}
@ -150,7 +150,7 @@ func Execute(cmd *exec.Cmd, timeout <-chan time.Time) (string, error) {
cmd.Stdout = mergeWriter(cmd.Stdout, &stdout)
cmd.Stderr = mergeWriter(cmd.Stderr, &stderr)
log.Infof("Execute '%s %s'", cmd.Path, strings.Join(cmd.Args[1:], " ")) // skip arg[0] as it is printed separately
logrus.Infof("Execute '%s %s'", cmd.Path, strings.Join(cmd.Args[1:], " ")) // skip arg[0] as it is printed separately
if err := cmd.Start(); err != nil {
return "", fmt.Errorf("error starting %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v", cmd, stdout.String(), stderr.String(), err)
}
@ -161,20 +161,20 @@ func Execute(cmd *exec.Cmd, timeout <-chan time.Time) (string, error) {
select {
case err := <-errCh:
if err != nil {
log.Debugf("%s %s failed: %v", cmd.Path, strings.Join(cmd.Args[1:], " "), err)
logrus.Debugf("%s %s failed: %v", cmd.Path, strings.Join(cmd.Args[1:], " "), err)
return stderr.String(), fmt.Errorf("error running %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v", cmd, stdout.String(), stderr.String(), err)
}
case <-timeout:
log.Debugf("%s %s timed-out", cmd.Path, strings.Join(cmd.Args[1:], " "))
logrus.Debugf("%s %s timed-out", cmd.Path, strings.Join(cmd.Args[1:], " "))
if err := terminateProcess(cmd); err != nil {
return "", err
}
return "", fmt.Errorf(
return stdout.String(), fmt.Errorf(
"timed out waiting for command %v:\nCommand stdout:\n%v\nstderr:\n%v",
cmd.Args, stdout.String(), stderr.String())
}
if stderr.String() != "" {
log.Debugf("stderr: %s", stderr.String())
logrus.Debugf("stderr: %s", stderr.String())
}
return stdout.String(), nil
}

View File

@ -21,7 +21,7 @@ import (
"strings"
"github.com/robpike/filter"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)
func nonEmptyString(s string) bool {
@ -54,5 +54,5 @@ func IsWindows() bool {
// It runs func
func It(description string, test func()) {
test()
log.Print("Passed: ", description)
logrus.Print("Passed: ", description)
}