Skip to content
This repository was archived by the owner on Sep 24, 2021. It is now read-only.

Revert to aws sdk v2 #93

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ env:
- DOCKER_CLI_EXPERIMENTAL=enabled

go:
- "1.14.x"
- "1.15.x"

# Only clone the most recent commit.
git:
Expand Down Expand Up @@ -36,4 +36,4 @@ script:
- docker buildx ls
- docker buildx create --name builder --driver docker-container --use
- docker buildx inspect --bootstrap
- 'if [ "$TRAVIS_PULL_REQUEST" != false ]; then make docker; else travis_wait make docker-multiarch TRAVIS_TAG=$TRAVIS_TAG; fi'
- 'if [ "$TRAVIS_PULL_REQUEST" != false ]; then make docker; else travis_wait make docker-multiarch TRAVIS_TAG=$TRAVIS_TAG; fi'
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.14-alpine as builder
FROM golang:1.15-alpine as builder
RUN apk add -U --no-cache ca-certificates
WORKDIR ${GOPATH}/src/github.com/awslabs/k8s-cloudwatch-adapter
COPY . ./
Expand Down
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
module github.com/awslabs/k8s-cloudwatch-adapter

go 1.14
go 1.15

require (
github.com/aws/aws-sdk-go v1.33.5
github.com/aws/aws-sdk-go-v2 v1.6.0
github.com/aws/aws-sdk-go-v2/config v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.1.1 // indirect
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.4.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.4.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.4.1 // indirect
github.com/go-sql-driver/mysql v1.4.0 // indirect
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200323093244-5046ce1afe6b
github.com/pkg/errors v0.9.1
gopkg.in/yaml.v2 v2.2.8 // indirect
Expand Down
354 changes: 121 additions & 233 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/apis/metrics/v1alpha1/externalmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type MetricStat struct {
// The period to use when retrieving the metric.
//
// Period is a required field
Period int64 `json:"period"`
Period int32 `json:"period"`

// The statistic to return. It can include any CloudWatch statistic or extended
// statistic.
Expand Down
60 changes: 30 additions & 30 deletions pkg/aws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"os"
"time"

"github.com/aws/aws-sdk-go/aws/credentials/stscreds"

"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/aws/aws-sdk-go-v2/service/sts"

"github.com/awslabs/k8s-cloudwatch-adapter/pkg/apis/metrics/v1alpha1"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"k8s.io/klog"
)

Expand All @@ -27,39 +27,40 @@ type cloudwatchManager struct {
localRegion string
}

func (c *cloudwatchManager) getClient(role, region *string) *cloudwatch.CloudWatch {
// Using the Config value, create the CloudWatch client
sess := session.Must(session.NewSession())
func (c *cloudwatchManager) getClient(role, region *string) *cloudwatch.Client {
// check if region is set
usedRegion := c.localRegion
if region != nil {
usedRegion = *region
}

// Using the SDK's default configuration, loading additional config
// and credentials values from the environment variables, shared
// credentials, and shared configuration files
cfg := aws.NewConfig().WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint)
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(usedRegion))
if err != nil {
panic("unable to load SDK config, " + err.Error())
}

klog.Infof("using AWS Region: %s", cfg.Region)

// check if roleARN is passed
if role != nil {
creds := stscreds.NewCredentials(sess, *role)
cfg = cfg.WithCredentials(creds)
client := sts.NewFromConfig(cfg)
provider := stscreds.NewAssumeRoleProvider(client, *role)
cfg.Credentials = aws.NewCredentialsCache(provider)
klog.Infof("using IAM role ARN: %s", *role)
}

// check if region is set
if region != nil {
cfg = cfg.WithRegion(*region)
} else if aws.StringValue(cfg.Region) == "" {
cfg.Region = aws.String(c.localRegion)
}
klog.Infof("using AWS Region: %s", aws.StringValue(cfg.Region))

if os.Getenv("DEBUG") == "true" {
cfg = cfg.WithLogLevel(aws.LogDebugWithHTTPBody)
cfg.ClientLogMode = aws.LogRequestWithBody | aws.LogResponseWithBody
}

svc := cloudwatch.New(sess, cfg)
return svc
client := cloudwatch.NewFromConfig(cfg)
return client
}

func (c *cloudwatchManager) QueryCloudWatch(request v1alpha1.ExternalMetric) ([]*cloudwatch.MetricDataResult, error) {
func (c *cloudwatchManager) QueryCloudWatch(request v1alpha1.ExternalMetric) ([]types.MetricDataResult, error) {
role := request.Spec.RoleARN
region := request.Spec.Region
cwQuery := toCloudWatchQuery(&request)
Expand All @@ -70,15 +71,14 @@ func (c *cloudwatchManager) QueryCloudWatch(request v1alpha1.ExternalMetric) ([]

cwQuery.EndTime = &endTime
cwQuery.StartTime = &startTime
cwQuery.ScanBy = aws.String("TimestampDescending")
cwQuery.ScanBy = types.ScanByTimestampDescending

req, resp := c.getClient(role, region).GetMetricDataRequest(&cwQuery)
req.SetContext(context.Background())
getMetricDataOutput, err := c.getClient(role, region).GetMetricData(context.Background(), &cwQuery)

if err := req.Send(); err != nil {
if err != nil {
klog.Errorf("err: %v", err)
return []*cloudwatch.MetricDataResult{}, err
return []types.MetricDataResult{}, err
}

return resp.MetricDataResults, nil
return getMetricDataOutput.MetricDataResults, nil
}
4 changes: 2 additions & 2 deletions pkg/aws/interface.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package aws

import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/awslabs/k8s-cloudwatch-adapter/pkg/apis/metrics/v1alpha1"
)

// CloudWatchManager manages clients for Amazon CloudWatch.
type CloudWatchManager interface {
// Query sends a CloudWatch GetMetricDataInput to CloudWatch API for metric results.
QueryCloudWatch(request v1alpha1.ExternalMetric) ([]*cloudwatch.MetricDataResult, error)
QueryCloudWatch(request v1alpha1.ExternalMetric) ([]types.MetricDataResult, error)
}
38 changes: 20 additions & 18 deletions pkg/aws/util.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,69 @@
package aws

import (
"io/ioutil"
"net/http"
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/awslabs/k8s-cloudwatch-adapter/pkg/apis/metrics/v1alpha1"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"

"k8s.io/klog"
)

// GetLocalRegion gets the region ID from the instance metadata.
func GetLocalRegion() string {
resp, err := http.Get("http://169.254.169.254/latest/meta-data/placement/availability-zone/")
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
klog.Errorf("unable to get current region information, %v", err)
klog.Errorf("error: %v", err)
return ""
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
client := imds.NewFromConfig(cfg)
region, err := client.GetRegion(context.TODO(), &imds.GetRegionInput{})
if err != nil {
klog.Errorf("cannot read response from instance metadata, %v", err)
klog.Errorf("Unable to retrieve the region from the EC2 instance %v\n", err)
return ""
}

// strip the last character from AZ to get region ID
return string(body[0 : len(body)-1])
return region.Region
}

func toCloudWatchQuery(externalMetric *v1alpha1.ExternalMetric) cloudwatch.GetMetricDataInput {
queries := externalMetric.Spec.Queries

cwMetricQueries := make([]*cloudwatch.MetricDataQuery, len(queries))
cwMetricQueries := make([]types.MetricDataQuery, len(queries))
for i, q := range queries {
q := q
returnData := &q.ReturnData
mdq := &cloudwatch.MetricDataQuery{
mdq := types.MetricDataQuery{
Id: &q.ID,
Label: &q.Label,
ReturnData: *returnData,
}

if len(q.Expression) == 0 {
dimensions := make([]*cloudwatch.Dimension, len(q.MetricStat.Metric.Dimensions))
dimensions := make([]types.Dimension, len(q.MetricStat.Metric.Dimensions))
for j := range q.MetricStat.Metric.Dimensions {
dimensions[j] = &cloudwatch.Dimension{
dimensions[j] = types.Dimension{
Name: &q.MetricStat.Metric.Dimensions[j].Name,
Value: &q.MetricStat.Metric.Dimensions[j].Value,
}
}

metric := &cloudwatch.Metric{
metric := &types.Metric{
Dimensions: dimensions,
MetricName: &q.MetricStat.Metric.MetricName,
Namespace: &q.MetricStat.Metric.Namespace,
}

mdq.MetricStat = &cloudwatch.MetricStat{
mdq.MetricStat = &types.MetricStat{
Metric: metric,
Period: &q.MetricStat.Period,
Stat: &q.MetricStat.Stat,
Unit: aws.String(q.MetricStat.Unit),
Unit: types.StandardUnit(q.MetricStat.Unit),
}
} else {
mdq.Expression = &q.Expression
Expand Down
3 changes: 1 addition & 2 deletions pkg/aws/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aws/aws-sdk-go/aws"
api "github.com/awslabs/k8s-cloudwatch-adapter/pkg/apis/metrics/v1alpha1"
)

Expand Down Expand Up @@ -69,7 +68,7 @@ func TestToCloudWatchQuery(t *testing.T) {
t.Errorf("metricRequest Stat = %v, want %v", *qStat.Stat, wantStat.Stat)
}

if aws.StringValue(qStat.Unit) != wantStat.Unit {
if string(qStat.Unit) != wantStat.Unit {
t.Errorf("metricRequest Unit = %v, want %v", qStat.Unit, wantStat.Unit)
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/provider/provider_external.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package provider

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -37,7 +36,7 @@ func (p *cloudwatchProvider) GetExternalMetric(namespace string, metricSelector
if len(metricValue) == 0 || len(metricValue[0].Values) == 0 {
quantity = *resource.NewMilliQuantity(0, resource.DecimalSI)
} else {
quantity = *resource.NewQuantity(int64(aws.Float64Value(metricValue[0].Values[0])), resource.DecimalSI)
quantity = *resource.NewQuantity(int64(metricValue[0].Values[0]), resource.DecimalSI)
}
externalMetricValue := external_metrics.ExternalMetricValue{
MetricName: info.Metric,
Expand Down
44 changes: 22 additions & 22 deletions samples/sqs/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,27 @@ import (
"os/signal"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
util "github.com/awslabs/k8s-cloudwatch-adapter/pkg/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/awslabs/k8s-cloudwatch-adapter/pkg/aws"
)

func main() {
// Using the SDK's default configuration, loading additional config
// and credentials values from the environment variables, shared
// credentials, and shared configuration files
cfg := aws.NewConfig()
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
panic("unable to load SDK config, " + err.Error())
}

if aws.StringValue(cfg.Region) == "" {
cfg.Region = aws.String(util.GetLocalRegion())
if cfg.Region == "" {
cfg.Region = aws.GetLocalRegion()
}
fmt.Println("using AWS Region:", cfg.Region)

svc := sqs.New(session.Must(session.NewSession(cfg)))
fmt.Println("using AWS Region: %s", cfg.Region)

svc := sqs.NewFromConfig(cfg)

// Initialize and create a SQS Queue named helloworld if it doesn't exist
queueName := os.Getenv("QUEUE")
Expand All @@ -33,11 +36,10 @@ func main() {
}
fmt.Println("listening to queue:", queueName)

req, q := svc.GetQueueUrlRequest(&sqs.GetQueueUrlInput{
q, err := svc.GetQueueUrl(context.Background(), &sqs.GetQueueUrlInput{
QueueName: &queueName,
})
req.SetContext(context.Background())
if err := req.Send(); req != nil {
if err != nil {
// handle queue creation error
fmt.Println("cannot get queue:", err)
}
Expand All @@ -49,26 +51,24 @@ func main() {
os.Exit(1)
}()

timeout := int64(20)
timeout := int32(20)

for {
req, msg := svc.ReceiveMessageRequest(&sqs.ReceiveMessageInput{
msg, err := svc.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
QueueUrl: q.QueueUrl,
WaitTimeSeconds: &timeout,
WaitTimeSeconds: timeout,
})
req.SetContext(context.Background())
if err := req.Send(); err == nil {
fmt.Println("message:", msg)
} else {
if err != nil {
fmt.Println("error receiving message from queue:", err)
} else {
fmt.Println("message:", msg)
}
if len(msg.Messages) > 0 {
req, _ := svc.DeleteMessageRequest(&sqs.DeleteMessageInput{
_, err = svc.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: q.QueueUrl,
ReceiptHandle: msg.Messages[0].ReceiptHandle,
})
req.SetContext(context.Background())
if err := req.Send(); err != nil {
if err != nil {
fmt.Println("error deleting message from queue:", err)
}
}
Expand Down
Loading