Merge pull request #1110 from docker/ecs_paginate

handle API pagination
This commit is contained in:
Nicolas De loof 2021-01-07 16:06:04 +01:00 committed by GitHub
commit c6cdfec530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 162 additions and 110 deletions

View File

@ -181,7 +181,7 @@ func (s sdk) GetDefaultVPC(ctx context.Context) (string, error) {
func (s sdk) GetSubNets(ctx context.Context, vpcID string) ([]awsResource, error) { func (s sdk) GetSubNets(ctx context.Context, vpcID string) ([]awsResource, error) {
logrus.Debug("Retrieve SubNets") logrus.Debug("Retrieve SubNets")
ids := []awsResource{} var ids []awsResource
var token *string var token *string
for { for {
subnets, err := s.EC2.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{ subnets, err := s.EC2.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{
@ -453,48 +453,61 @@ func (s sdk) ListStacks(ctx context.Context, name string) ([]compose.Stack, erro
if name != "" { if name != "" {
params.StackName = &name params.StackName = &name
} }
cfStacks, err := s.CF.DescribeStacksWithContext(ctx, &params) var token *string
if err != nil { var stacks []compose.Stack
return nil, err for {
} response, err := s.CF.DescribeStacksWithContext(ctx, &params)
stacks := []compose.Stack{} if err != nil {
for _, stack := range cfStacks.Stacks { return nil, err
for _, t := range stack.Tags { }
if *t.Key == compose.ProjectTag { for _, stack := range response.Stacks {
status := compose.RUNNING for _, t := range stack.Tags {
switch aws.StringValue(stack.StackStatus) { if *t.Key == compose.ProjectTag {
case "CREATE_IN_PROGRESS": status := compose.RUNNING
status = compose.STARTING switch aws.StringValue(stack.StackStatus) {
case "DELETE_IN_PROGRESS": case "CREATE_IN_PROGRESS":
status = compose.REMOVING status = compose.STARTING
case "UPDATE_IN_PROGRESS": case "DELETE_IN_PROGRESS":
status = compose.UPDATING status = compose.REMOVING
default: case "UPDATE_IN_PROGRESS":
status = compose.UPDATING
default:
}
stacks = append(stacks, compose.Stack{
ID: aws.StringValue(stack.StackId),
Name: aws.StringValue(stack.StackName),
Status: status,
})
break
} }
stacks = append(stacks, compose.Stack{
ID: aws.StringValue(stack.StackId),
Name: aws.StringValue(stack.StackName),
Status: status,
})
break
} }
} }
if token == response.NextToken {
return stacks, nil
}
token = response.NextToken
} }
return stacks, nil
} }
func (s sdk) GetStackClusterID(ctx context.Context, stack string) (string, error) { func (s sdk) GetStackClusterID(ctx context.Context, stack string) (string, error) {
// Note: could use DescribeStackResource but we only can detect `does not exist` case by matching string error message // Note: could use DescribeStackResource but we only can detect `does not exist` case by matching string error message
resources, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{ var token *string
StackName: aws.String(stack), for {
}) response, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{
if err != nil { StackName: aws.String(stack),
return "", err })
} if err != nil {
for _, r := range resources.StackResourceSummaries { return "", err
if aws.StringValue(r.ResourceType) == "AWS::ECS::Cluster" {
return aws.StringValue(r.PhysicalResourceId), nil
} }
for _, r := range response.StackResourceSummaries {
if aws.StringValue(r.ResourceType) == "AWS::ECS::Cluster" {
return aws.StringValue(r.PhysicalResourceId), nil
}
}
if token == response.NextToken {
break
}
token = response.NextToken
} }
// stack is using user-provided cluster // stack is using user-provided cluster
res, err := s.CF.GetTemplateSummaryWithContext(ctx, &cloudformation.GetTemplateSummaryInput{ res, err := s.CF.GetTemplateSummaryWithContext(ctx, &cloudformation.GetTemplateSummaryInput{
@ -522,19 +535,27 @@ type templateMetadata struct {
func (s sdk) GetServiceTaskDefinition(ctx context.Context, cluster string, serviceArns []string) (map[string]string, error) { func (s sdk) GetServiceTaskDefinition(ctx context.Context, cluster string, serviceArns []string) (map[string]string, error) {
defs := map[string]string{} defs := map[string]string{}
svc := []*string{} svc := []*string{}
for _, s := range serviceArns { for _, s := range serviceArns {
svc = append(svc, aws.String(s)) svc = append(svc, aws.String(s))
} }
services, err := s.ECS.DescribeServicesWithContext(ctx, &ecs.DescribeServicesInput{ for i := 0; i < len(svc); i += 10 {
Cluster: aws.String(cluster), end := i + 10
Services: svc, if end > len(svc) {
}) end = len(svc)
if err != nil { }
return nil, err chunk := svc[i:end]
} services, err := s.ECS.DescribeServicesWithContext(ctx, &ecs.DescribeServicesInput{
for _, s := range services.Services { Cluster: aws.String(cluster),
defs[aws.StringValue(s.ServiceArn)] = aws.StringValue(s.TaskDefinition) Services: chunk,
})
if err != nil {
return nil, err
}
for _, s := range services.Services {
defs[aws.StringValue(s.ServiceArn)] = aws.StringValue(s.TaskDefinition)
}
} }
return defs, nil return defs, nil
} }
@ -570,25 +591,32 @@ func (s sdk) GetServiceTasks(ctx context.Context, cluster string, service string
if stopped { if stopped {
state = "STOPPED" state = "STOPPED"
} }
tasks, err := s.ECS.ListTasksWithContext(ctx, &ecs.ListTasksInput{ var token *string
Cluster: aws.String(cluster), var tasks []*ecs.Task
ServiceName: aws.String(service), for {
DesiredStatus: aws.String(state), response, err := s.ECS.ListTasksWithContext(ctx, &ecs.ListTasksInput{
}) Cluster: aws.String(cluster),
if err != nil { ServiceName: aws.String(service),
return nil, err DesiredStatus: aws.String(state),
}
if len(tasks.TaskArns) > 0 {
taskDescriptions, err := s.ECS.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
Cluster: aws.String(cluster),
Tasks: tasks.TaskArns,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
return taskDescriptions.Tasks, nil if len(response.TaskArns) > 0 {
taskDescriptions, err := s.ECS.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
Cluster: aws.String(cluster),
Tasks: response.TaskArns,
})
if err != nil {
return nil, err
}
tasks = append(tasks, taskDescriptions.Tasks...)
}
if token == response.NextToken {
return tasks, nil
}
token = response.NextToken
} }
return nil, nil
} }
func (s sdk) GetTaskStoppedReason(ctx context.Context, cluster string, taskArn string) (string, error) { func (s sdk) GetTaskStoppedReason(ctx context.Context, cluster string, taskArn string) (string, error) {
@ -671,24 +699,29 @@ func (resources stackResources) apply(awsType string, fn stackResourceFn) error
} }
func (s sdk) ListStackResources(ctx context.Context, name string) (stackResources, error) { func (s sdk) ListStackResources(ctx context.Context, name string) (stackResources, error) {
// FIXME handle pagination var token *string
res, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{ var resources stackResources
StackName: aws.String(name), for {
}) response, err := s.CF.ListStackResourcesWithContext(ctx, &cloudformation.ListStackResourcesInput{
if err != nil { StackName: aws.String(name),
return nil, err
}
resources := stackResources{}
for _, r := range res.StackResourceSummaries {
resources = append(resources, stackResource{
LogicalID: aws.StringValue(r.LogicalResourceId),
Type: aws.StringValue(r.ResourceType),
ARN: aws.StringValue(r.PhysicalResourceId),
Status: aws.StringValue(r.ResourceStatus),
}) })
if err != nil {
return nil, err
}
for _, r := range response.StackResourceSummaries {
resources = append(resources, stackResource{
LogicalID: aws.StringValue(r.LogicalResourceId),
Type: aws.StringValue(r.ResourceType),
ARN: aws.StringValue(r.PhysicalResourceId),
Status: aws.StringValue(r.ResourceStatus),
})
}
if token == response.NextToken {
return resources, nil
}
token = response.NextToken
} }
return resources, nil
} }
func (s sdk) DeleteStack(ctx context.Context, name string) error { func (s sdk) DeleteStack(ctx context.Context, name string) error {
@ -744,25 +777,32 @@ func (s sdk) InspectSecret(ctx context.Context, id string) (secrets.Secret, erro
func (s sdk) ListSecrets(ctx context.Context) ([]secrets.Secret, error) { func (s sdk) ListSecrets(ctx context.Context) ([]secrets.Secret, error) {
logrus.Debug("List secrets ...") logrus.Debug("List secrets ...")
response, err := s.SM.ListSecrets(&secretsmanager.ListSecretsInput{})
if err != nil {
return nil, err
}
var ls []secrets.Secret var ls []secrets.Secret
for _, sec := range response.SecretList { var token *string
for {
tags := map[string]string{} response, err := s.SM.ListSecrets(&secretsmanager.ListSecretsInput{})
for _, tag := range sec.Tags { if err != nil {
tags[*tag.Key] = *tag.Value return nil, err
} }
ls = append(ls, secrets.Secret{
ID: *sec.ARN, for _, sec := range response.SecretList {
Name: *sec.Name,
Labels: tags, tags := map[string]string{}
}) for _, tag := range sec.Tags {
tags[*tag.Key] = *tag.Value
}
ls = append(ls, secrets.Secret{
ID: *sec.ARN,
Name: *sec.Name,
Labels: tags,
})
}
if token == response.NextToken {
return ls, nil
}
token = response.NextToken
} }
return ls, nil
} }
func (s sdk) DeleteSecret(ctx context.Context, id string, recover bool) error { func (s sdk) DeleteSecret(ctx context.Context, id string, recover bool) error {
@ -967,34 +1007,46 @@ func (s sdk) getURLWithPortMapping(ctx context.Context, targetGroupArns []string
} }
func (s sdk) ListTasks(ctx context.Context, cluster string, family string) ([]string, error) { func (s sdk) ListTasks(ctx context.Context, cluster string, family string) ([]string, error) {
tasks, err := s.ECS.ListTasksWithContext(ctx, &ecs.ListTasksInput{ var token *string
Cluster: aws.String(cluster), var arns []string
Family: aws.String(family), for {
}) response, err := s.ECS.ListTasksWithContext(ctx, &ecs.ListTasksInput{
if err != nil { Cluster: aws.String(cluster),
return nil, err Family: aws.String(family),
})
if err != nil {
return nil, err
}
for _, arn := range response.TaskArns {
arns = append(arns, *arn)
}
if token == response.NextToken {
return arns, nil
}
token = response.NextToken
} }
arns := []string{}
for _, arn := range tasks.TaskArns {
arns = append(arns, *arn)
}
return arns, nil
} }
func (s sdk) GetPublicIPs(ctx context.Context, interfaces ...string) (map[string]string, error) { func (s sdk) GetPublicIPs(ctx context.Context, interfaces ...string) (map[string]string, error) {
desc, err := s.EC2.DescribeNetworkInterfaces(&ec2.DescribeNetworkInterfacesInput{ var token *string
NetworkInterfaceIds: aws.StringSlice(interfaces),
})
if err != nil {
return nil, err
}
publicIPs := map[string]string{} publicIPs := map[string]string{}
for _, interf := range desc.NetworkInterfaces { for {
if interf.Association != nil { response, err := s.EC2.DescribeNetworkInterfaces(&ec2.DescribeNetworkInterfacesInput{
publicIPs[aws.StringValue(interf.NetworkInterfaceId)] = aws.StringValue(interf.Association.PublicIp) NetworkInterfaceIds: aws.StringSlice(interfaces),
})
if err != nil {
return nil, err
} }
for _, interf := range response.NetworkInterfaces {
if interf.Association != nil {
publicIPs[aws.StringValue(interf.NetworkInterfaceId)] = aws.StringValue(interf.Association.PublicIp)
}
}
if token == response.NextToken {
return publicIPs, nil
}
token = response.NextToken
} }
return publicIPs, nil
} }
func (s sdk) ResolveLoadBalancer(ctx context.Context, nameOrarn string) (awsResource, string, error) { func (s sdk) ResolveLoadBalancer(ctx context.Context, nameOrarn string) (awsResource, string, error) {