Skip to content

add integration server tests for resources version checks #17312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions integrationservertest/apm-version-bug_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package integrationservertest

import (
"errors"
"fmt"
"regexp"
"testing"

"github.com/elastic/apm-server/integrationservertest/internal/ech"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
)

// See https://github.com/elastic/ingest-dev/issues/5701
func TestAPMResourcesVersionBug(t *testing.T) {
config, err := parseConfig("upgrade-config.yaml")
if err != nil {
t.Fatal(err)
}

// the event.ingested change was introduced in 8.16.3, as
// it only shows if we upgrade a cluster from a version with
// the bug we need to start from 8.16.2.
start := ech.NewVersion(8, 16, 2, "SNAPSHOT")

// this wraps the usual steps build to add additional checks in between.
// Is a bit brittle, maybe we should consider allowing this as a first class citizen.
buildteststepsForBug := func(t *testing.T, versions ech.Versions, config upgradeTestConfig) []testStep {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buildteststepsForBug := func(t *testing.T, versions ech.Versions, config upgradeTestConfig) []testStep {
buildTestStepsForBug := func(t *testing.T, versions ech.Versions, config upgradeTestConfig) []testStep {

steps := buildTestSteps(t, versions, config, false)

zeroEventIngestedDocs := checkFieldExistsInDocsStep{
dataStreamName: "traces-apm-default",
fieldName: "event.ingested",
checkFn: func(i int64) bool { return i == 0 },
}
noEventIngestedInMappings := checkMappingStep{
datastreamname: "traces-apm-default",
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000001"),
checkFn: func(mappings types.TypeMapping) error {
if !hasNestedField(mappings, "event.ingested") {
return nil
}
return errors.New("there should be no event.ingested here")
},
}
someEventIngestedDocs := checkFieldExistsInDocsStep{
dataStreamName: "traces-apm-default",
fieldName: "event.ingested",
}
eventIngestedHasMappings := checkMappingStep{
datastreamname: "traces-apm-default",
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000002"),
checkFn: func(mappings types.TypeMapping) error {
if hasNestedField(mappings, "event.ingested") {
return nil
}
return fmt.Errorf("there should be an event.ingested here")
},
}
if len(versions) == 3 {
eventIngestedHasMappings.indexName = regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000003")
}

if len(versions) == 2 { // if 2 versions we expect the first to be bugged and the second to be not
return []testStep{
steps[0], // create
steps[1], // ingest
zeroEventIngestedDocs,
noEventIngestedInMappings,
steps[2], // upgrade
steps[3], // ingest
someEventIngestedDocs,
eventIngestedHasMappings,
}

} else if len(versions) == 3 { // if 3 versions we expect the first to be bugged on create, the second on upgrade, the third to be not
return []testStep{
steps[0], // create
steps[1], // ingest
steps[2], // upgrade
steps[3], // ingest
zeroEventIngestedDocs,
noEventIngestedInMappings,
steps[4], // upgrade
steps[5], // upgrade
someEventIngestedDocs,
eventIngestedHasMappings,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely quite flimsy, I think having a way to tell when 1 "cycle" (create / upgrade -> ingest -> others) is done would be best. Perhaps having the cycles manually defined e.g.

[]testCycle{
    {
        createStep,
        ingestStep,
    },
    {
        upgradeStep,
        ingestStep,
     },
     ...
}

Or some other way to detect it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are suggesting to not use the buildTestStep function but create them manually so it's easier to follow the various cycles?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad I wasn't clear enough. Creating them manually works, but we can also change how steps are defined in the runner. In the example above, testCycle is essentially just: type testCycle []testStep, and demarcates where one version ends and the other starts.

In your case, since you are inserting the steps in between each "cycle", it would be much less flimsy since you can inject the steps at the end of each cycle.

Just an idea, not sure if it would work well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, I refactored it in a way that I think removes some of the ifs, but did not address this issue directly in 9874d2d (#17312)

I think there is value in doing something you propose if we decide is something we want to introduce but we should take some time to think the feature through, here I'm only interested in a quick way to have additional steps (we could also extract createStep and upgradeStep "builder" functions from the current one, so they are easier to compose, but I considered it out of scope here)


return []testStep{}
}

t.Run("8.16.2 to 8.17.8", func(t *testing.T) {
from := start
to := ech.NewVersion(8, 17, 7, "SNAPSHOT")
versions := []ech.Version{from, to}
runner := testStepsRunner{
Target: *target,
Steps: buildteststepsForBug(t, versions, config),
}
runner.Run(t)
})

t.Run("8.17.7 to 8.17.8", func(t *testing.T) {
from := ech.NewVersion(8, 17, 7, "SNAPSHOT")
to := vsCache.GetLatestSnapshot(t, "8.17")
versions := []ech.Version{start, from, to}
runner := testStepsRunner{
Target: *target,
Steps: buildteststepsForBug(t, versions, config),
}
runner.Run(t)
})

t.Run("8.17.8 to 8.18.3", func(t *testing.T) {
from := vsCache.GetLatestSnapshot(t, "8.17")
to := vsCache.GetLatestSnapshot(t, "8.18")
versions := []ech.Version{from, to}
runner := testStepsRunner{
Target: *target,
Steps: buildTestSteps(t, versions, config, false),
}
runner.Run(t)
})

t.Run("8.18.2 to 8.18.3", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this includes start, shouldn't it be "8.16.2 to 8.17.x to 8.18.x"? Also vsCache.GetLatestSnapshot gets the latest patch for the minor if you specify x.y, so the description will be inaccurate when new patches come out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named this after our test cases for this bug. I don't think we want to merge this PR as it is, with all these test cases for all these versions. This is now useful to ensure our fixes are working but I would make it more general if we want to include it as our regular testing.

I think a necessary test case to add is an upgrade test: from the "previous previous" minor (ie in this case 8.16.2) to the previous minor and than to the current minor. Such a test could find issues that only appears with an upgrade, a scenario we don't yet cover.

from := ech.NewVersion(8, 18, 2, "SNAPSHOT")
to := vsCache.GetLatestSnapshot(t, "8.17")
versions := []ech.Version{start, from, to}
runner := testStepsRunner{
Target: *target,
Steps: buildteststepsForBug(t, versions, config),
}
runner.Run(t)
})

t.Run("8.18.3 to 8.19.0", func(t *testing.T) {
from := vsCache.GetLatestSnapshot(t, "8.18")
to := vsCache.GetLatestSnapshot(t, "8.19")
versions := []ech.Version{start, from, to}
steps := buildTestSteps(t, versions, config, false)
someEventIngestedDocs := checkFieldExistsInDocsStep{
dataStreamName: "traces-apm-default",
fieldName: "event.ingested",
}
eventIngestedHasMappings := checkMappingStep{
datastreamname: "traces-apm-default",
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000002"),
checkFn: func(mappings types.TypeMapping) error {
if hasNestedField(mappings, "event.ingested") {
return nil
}
return fmt.Errorf("there should be an event.ingested here")
},
}

newSteps := []testStep{
steps[0], // create
steps[1], // ingest
steps[2], // upgrade
steps[3], // ingest
someEventIngestedDocs,
eventIngestedHasMappings,
steps[4], // upgrade
steps[5], // upgrade
someEventIngestedDocs,
eventIngestedHasMappings,
checkFieldExistsInDocsStep{
dataStreamName: "traces-apm-default",
fieldName: "event.success_count",
},
}
runner := testStepsRunner{
Target: *target,
Steps: newSteps,
}
runner.Run(t)

})
}
152 changes: 152 additions & 0 deletions integrationservertest/buglib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package integrationservertest

import (
"context"
"errors"
"fmt"
"regexp"
"strings"
"testing"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/count"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
)

// hasNestedField checks if a nested field (dot-separated path) exists in the properties map.
func hasNestedField(mappings types.TypeMapping, fieldPath string) bool {
parts := strings.Split(fieldPath, ".")
current := mappings.Properties
// fmt.Println("parts", parts)
// fmt.Println("curre", current)
for i, part := range parts {
// Check if the part exists in the current properties map
val, exists := current[part]
// fmt.Println("val", val, "parts", i == len(parts)-1)
if !exists {
return false
}
// If this is the last part, the field exists
if i == len(parts)-1 {
return true
}
// Try to descend into sub-properties (for nested/object fields)
fieldMap, ok := val.(*types.ObjectProperty)
if !ok {
return false
}
// fmt.Println("fieldMap", fieldMap)
subProps := fieldMap.Properties
current = subProps
}
return false
}

type checkMappingStep struct {
datastreamname string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
datastreamname string
dataStreamName string

indexName *regexp.Regexp
checkFn func(mappings types.TypeMapping) error
}

func (c checkMappingStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) {
t.Logf("------ check data stream mappings in %s ------", e.currentVersion())
t.Logf("checking data stream mapping on %s", c.datastreamname)
err := c.checkDataStreamMappings(t, ctx, e.esc.TypedClient())
if err != nil {
t.Log(err)
t.Fail()
}
t.Log("success")
}

func (c checkMappingStep) checkDataStreamMappings(t *testing.T, ctx context.Context, es *elasticsearch.TypedClient) error {
dataStreamName := c.datastreamname

info, err := es.Indices.GetDataStream().Name(dataStreamName).Do(ctx)
if err != nil {
return fmt.Errorf("Error getting data stream info: %s", err)
}
is := []string{}
for _, v := range info.DataStreams[0].Indices {
is = append(is, v.IndexName)
}
t.Logf("indices: %s", is)

var backingIndex string
for _, v := range info.DataStreams[0].Indices {
if c.indexName.Match([]byte(v.IndexName)) {
backingIndex = v.IndexName
}
}
if backingIndex == "" {
// collect available indices for easier troubleshooting
indices := []string{}
for _, v := range info.DataStreams[0].Indices {
indices = append(indices, v.IndexName)
}
return errors.New(fmt.Sprintf("no index matches the filter regexp; filter=%s indices=%s", c.indexName, indices))
}

mappingRes, err := es.Indices.GetMapping().Index(backingIndex).Do(ctx)
if err != nil {
return fmt.Errorf("Error getting mapping: %w", err)
}

indexMappingRecord, ok := mappingRes[backingIndex]
if !ok {
return errors.New(fmt.Sprintf("Backing index %s not found in mapping", backingIndex))
}

if err := c.checkFn(indexMappingRecord.Mappings); err != nil {
return fmt.Errorf("mapping check failed: %w", err)
}

return nil
}

type checkFieldExistsInDocsStep struct {
dataStreamName string
fieldName string
// checkFn can be used to customize the check this step performs on the returned
// doc count. By default it checks if the vlaue is greater than 0.
checkFn func(i int64) bool
}

func (s checkFieldExistsInDocsStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) {
t.Logf("------ check data stream docs with field %s in %s ------", s.fieldName, e.currentVersion())
count, err := getDocCountWithField(ctx, e.esc.TypedClient(), s.dataStreamName, s.fieldName)
if err != nil {
t.Log(err)
t.Fail()
}
t.Logf("documents in data stream %s with '%s': %d\n", s.dataStreamName, s.fieldName, count)

if s.checkFn == nil {
s.checkFn = func(i int64) bool { return i > 0 }
}

if !s.checkFn(count) {
t.Log("checkFieldExistsInDocsStep: check function failed")
t.Fail()
return
}
t.Log("check successful")
}

func getDocCountWithField(ctx context.Context, c *elasticsearch.TypedClient, dataStreamName, fieldName string) (int64, error) {
req := &count.Request{
Query: &types.Query{
Exists: &types.ExistsQuery{
Field: fieldName,
},
},
}
resp, err := c.Core.Count().
Index(dataStreamName).
Request(req).
Do(ctx)
if err != nil {
return 0, fmt.Errorf("cannot retrieve doc count: %w", err)
}
return resp.Count, nil
}
1 change: 1 addition & 0 deletions integrationservertest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/elastic/elasticsearch-serverless-go v0.1.1-20231031 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/analysis v0.21.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions integrationservertest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ github.com/elastic/cloud-sdk-go v1.23.0 h1:IH41W2E+GIlHjuPtkNPbpYoACAJ8ffk3QiyDj
github.com/elastic/cloud-sdk-go v1.23.0/go.mod h1:k0ZebhZKX22l6Ysl5Zbpc8VLF54hfwDtHppEEEVUJ04=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/elasticsearch-serverless-go v0.1.1-20231031 h1:6zBFaT5Klj/0zHi/35wfTUhy1lgvf/tC9mStYRW9dYA=
github.com/elastic/elasticsearch-serverless-go v0.1.1-20231031/go.mod h1:6DwhqJIaaVnN8AOAfAG0BRG3EsxoQ4ai/NAOxK20sJE=
github.com/elastic/go-elasticsearch/v8 v8.16.0 h1:f7bR+iBz8GTAVhwyFO3hm4ixsz2eMaEy0QroYnXV3jE=
github.com/elastic/go-elasticsearch/v8 v8.16.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
Expand Down
4 changes: 4 additions & 0 deletions integrationservertest/internal/ech/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func (v Version) String() string {
return fmt.Sprintf("%d.%d.%d%s", v.Major, v.Minor, v.Patch, suffix)
}

func (v Version) MajorMinorPatch() string {
return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
}

func (v Version) MajorMinor() string {
return fmt.Sprintf("%d.%d", v.Major, v.Minor)
}
Expand Down
4 changes: 4 additions & 0 deletions integrationservertest/internal/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func formatDurationElasticsearch(d time.Duration) string {
return fmt.Sprintf("%dnanos", d)
}

func (c *Client) TypedClient() *elasticsearch.TypedClient {
return c.es
}

// CreateAPIKey creates an API Key, and returns it in the base64-encoded form
// that agents should provide.
//
Expand Down
10 changes: 10 additions & 0 deletions integrationservertest/internal/terraform/terraform.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ func (t *Runner) Destroy(ctx context.Context, vars ...tfexec.DestroyOption) erro
}

func (t *Runner) Output(name string, res any) error {
// try to load outputs from the terraform state if
// our cache is empty
if len(t.outputs) == 0 {
outputs, err := t.tf.Output(context.Background())
// only load if output command succeeded
if err == nil {
t.outputs = outputs
}
}

o, ok := t.outputs[name]
if !ok {
return fmt.Errorf("output named %s not found", name)
Expand Down
Loading