-
Notifications
You must be signed in to change notification settings - Fork 536
add integration server tests for resources version checks #17312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 9 commits
91754fb
1eae9bb
07b3644
cf94108
6252267
af20d51
b09f302
8efcfac
0b64a86
53bd820
9874d2d
e068415
bf1abc5
cf7f10f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
package integrationservertest | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"regexp" | ||
"testing" | ||
|
||
"github.com/elastic/apm-server/integrationservertest/internal/ech" | ||
"github.com/elastic/go-elasticsearch/v8/typedapi/types" | ||
) | ||
|
||
// See https://github.com/elastic/ingest-dev/issues/5701 | ||
func TestAPMResourcesVersionBug(t *testing.T) { | ||
config, err := parseConfig("upgrade-config.yaml") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// the event.ingested change was introduced in 8.16.3, as | ||
// it only shows if we upgrade a cluster from a version with | ||
// the bug we need to start from 8.16.2. | ||
start := ech.NewVersion(8, 16, 2, "SNAPSHOT") | ||
|
||
// this wraps the usual steps build to add additional checks in between. | ||
// Is a bit brittle, maybe we should consider allowing this as a first class citizen. | ||
buildteststepsForBug := func(t *testing.T, versions ech.Versions, config upgradeTestConfig) []testStep { | ||
steps := buildTestSteps(t, versions, config, false) | ||
|
||
zeroEventIngestedDocs := checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.ingested", | ||
checkFn: func(i int64) bool { return i == 0 }, | ||
} | ||
noEventIngestedInMappings := checkMappingStep{ | ||
datastreamname: "traces-apm-default", | ||
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000001"), | ||
checkFn: func(mappings types.TypeMapping) error { | ||
if !hasNestedField(mappings, "event.ingested") { | ||
return nil | ||
} | ||
return errors.New("there should be no event.ingested here") | ||
}, | ||
} | ||
someEventIngestedDocs := checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.ingested", | ||
} | ||
eventIngestedHasMappings := checkMappingStep{ | ||
datastreamname: "traces-apm-default", | ||
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000002"), | ||
checkFn: func(mappings types.TypeMapping) error { | ||
if hasNestedField(mappings, "event.ingested") { | ||
return nil | ||
} | ||
return fmt.Errorf("there should be an event.ingested here") | ||
}, | ||
} | ||
if len(versions) == 3 { | ||
eventIngestedHasMappings.indexName = regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000003") | ||
} | ||
|
||
if len(versions) == 2 { // if 2 versions we expect the first to be bugged and the second to be not | ||
return []testStep{ | ||
steps[0], // create | ||
steps[1], // ingest | ||
zeroEventIngestedDocs, | ||
noEventIngestedInMappings, | ||
steps[2], // upgrade | ||
steps[3], // ingest | ||
someEventIngestedDocs, | ||
eventIngestedHasMappings, | ||
} | ||
|
||
} else if len(versions) == 3 { // if 3 versions we expect the first to be bugged on create, the second on upgrade, the third to be not | ||
return []testStep{ | ||
steps[0], // create | ||
steps[1], // ingest | ||
steps[2], // upgrade | ||
steps[3], // ingest | ||
zeroEventIngestedDocs, | ||
noEventIngestedInMappings, | ||
steps[4], // upgrade | ||
steps[5], // upgrade | ||
someEventIngestedDocs, | ||
eventIngestedHasMappings, | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is definitely quite flimsy, I think having a way to tell when 1 "cycle" (create / upgrade -> ingest -> others) is done would be best. Perhaps having the cycles manually defined e.g.
Or some other way to detect it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are suggesting to not use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad I wasn't clear enough. Creating them manually works, but we can also change how steps are defined in the runner. In the example above, In your case, since you are inserting the steps in between each "cycle", it would be much less flimsy since you can inject the steps at the end of each cycle. Just an idea, not sure if it would work well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see, I refactored it in a way that I think removes some of the ifs, but did not address this issue directly in I think there is value in doing something you propose if we decide is something we want to introduce but we should take some time to think the feature through, here I'm only interested in a quick way to have additional steps (we could also extract |
||
|
||
return []testStep{} | ||
} | ||
|
||
t.Run("8.16.2 to 8.17.8", func(t *testing.T) { | ||
from := start | ||
to := ech.NewVersion(8, 17, 7, "SNAPSHOT") | ||
versions := []ech.Version{from, to} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: buildteststepsForBug(t, versions, config), | ||
} | ||
runner.Run(t) | ||
}) | ||
|
||
t.Run("8.17.7 to 8.17.8", func(t *testing.T) { | ||
from := ech.NewVersion(8, 17, 7, "SNAPSHOT") | ||
to := vsCache.GetLatestSnapshot(t, "8.17") | ||
versions := []ech.Version{start, from, to} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: buildteststepsForBug(t, versions, config), | ||
} | ||
runner.Run(t) | ||
}) | ||
|
||
t.Run("8.17.8 to 8.18.3", func(t *testing.T) { | ||
from := vsCache.GetLatestSnapshot(t, "8.17") | ||
to := vsCache.GetLatestSnapshot(t, "8.18") | ||
versions := []ech.Version{from, to} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: buildTestSteps(t, versions, config, false), | ||
} | ||
runner.Run(t) | ||
}) | ||
|
||
t.Run("8.18.2 to 8.18.3", func(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this includes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I named this after our test cases for this bug. I don't think we want to merge this PR as it is, with all these test cases for all these versions. This is now useful to ensure our fixes are working but I would make it more general if we want to include it as our regular testing. I think a necessary test case to add is an upgrade test: from the "previous previous" minor (ie in this case 8.16.2) to the previous minor and than to the current minor. Such a test could find issues that only appears with an upgrade, a scenario we don't yet cover. |
||
from := ech.NewVersion(8, 18, 2, "SNAPSHOT") | ||
to := vsCache.GetLatestSnapshot(t, "8.17") | ||
versions := []ech.Version{start, from, to} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: buildteststepsForBug(t, versions, config), | ||
} | ||
runner.Run(t) | ||
}) | ||
|
||
t.Run("8.18.3 to 8.19.0", func(t *testing.T) { | ||
from := vsCache.GetLatestSnapshot(t, "8.18") | ||
to := vsCache.GetLatestSnapshot(t, "8.19") | ||
versions := []ech.Version{start, from, to} | ||
steps := buildTestSteps(t, versions, config, false) | ||
someEventIngestedDocs := checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.ingested", | ||
} | ||
eventIngestedHasMappings := checkMappingStep{ | ||
datastreamname: "traces-apm-default", | ||
indexName: regexp.MustCompile(".ds-traces-apm-default-[0-9.]+-000002"), | ||
checkFn: func(mappings types.TypeMapping) error { | ||
if hasNestedField(mappings, "event.ingested") { | ||
return nil | ||
} | ||
return fmt.Errorf("there should be an event.ingested here") | ||
}, | ||
} | ||
|
||
newSteps := []testStep{ | ||
steps[0], // create | ||
steps[1], // ingest | ||
steps[2], // upgrade | ||
steps[3], // ingest | ||
someEventIngestedDocs, | ||
eventIngestedHasMappings, | ||
steps[4], // upgrade | ||
steps[5], // upgrade | ||
someEventIngestedDocs, | ||
eventIngestedHasMappings, | ||
checkFieldExistsInDocsStep{ | ||
dataStreamName: "traces-apm-default", | ||
fieldName: "event.success_count", | ||
}, | ||
} | ||
runner := testStepsRunner{ | ||
Target: *target, | ||
Steps: newSteps, | ||
} | ||
runner.Run(t) | ||
|
||
}) | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,152 @@ | ||||||
package integrationservertest | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"errors" | ||||||
"fmt" | ||||||
"regexp" | ||||||
"strings" | ||||||
"testing" | ||||||
|
||||||
"github.com/elastic/go-elasticsearch/v8" | ||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/count" | ||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/types" | ||||||
) | ||||||
|
||||||
// hasNestedField checks if a nested field (dot-separated path) exists in the properties map. | ||||||
func hasNestedField(mappings types.TypeMapping, fieldPath string) bool { | ||||||
parts := strings.Split(fieldPath, ".") | ||||||
current := mappings.Properties | ||||||
// fmt.Println("parts", parts) | ||||||
// fmt.Println("curre", current) | ||||||
for i, part := range parts { | ||||||
// Check if the part exists in the current properties map | ||||||
val, exists := current[part] | ||||||
// fmt.Println("val", val, "parts", i == len(parts)-1) | ||||||
if !exists { | ||||||
return false | ||||||
} | ||||||
// If this is the last part, the field exists | ||||||
if i == len(parts)-1 { | ||||||
return true | ||||||
} | ||||||
// Try to descend into sub-properties (for nested/object fields) | ||||||
fieldMap, ok := val.(*types.ObjectProperty) | ||||||
if !ok { | ||||||
return false | ||||||
} | ||||||
// fmt.Println("fieldMap", fieldMap) | ||||||
subProps := fieldMap.Properties | ||||||
current = subProps | ||||||
} | ||||||
return false | ||||||
} | ||||||
|
||||||
type checkMappingStep struct { | ||||||
datastreamname string | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
indexName *regexp.Regexp | ||||||
checkFn func(mappings types.TypeMapping) error | ||||||
} | ||||||
|
||||||
func (c checkMappingStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) { | ||||||
t.Logf("------ check data stream mappings in %s ------", e.currentVersion()) | ||||||
t.Logf("checking data stream mapping on %s", c.datastreamname) | ||||||
err := c.checkDataStreamMappings(t, ctx, e.esc.TypedClient()) | ||||||
if err != nil { | ||||||
t.Log(err) | ||||||
t.Fail() | ||||||
} | ||||||
t.Log("success") | ||||||
} | ||||||
|
||||||
func (c checkMappingStep) checkDataStreamMappings(t *testing.T, ctx context.Context, es *elasticsearch.TypedClient) error { | ||||||
dataStreamName := c.datastreamname | ||||||
|
||||||
info, err := es.Indices.GetDataStream().Name(dataStreamName).Do(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("Error getting data stream info: %s", err) | ||||||
} | ||||||
is := []string{} | ||||||
for _, v := range info.DataStreams[0].Indices { | ||||||
is = append(is, v.IndexName) | ||||||
} | ||||||
t.Logf("indices: %s", is) | ||||||
|
||||||
var backingIndex string | ||||||
for _, v := range info.DataStreams[0].Indices { | ||||||
if c.indexName.Match([]byte(v.IndexName)) { | ||||||
backingIndex = v.IndexName | ||||||
} | ||||||
} | ||||||
if backingIndex == "" { | ||||||
// collect available indices for easier troubleshooting | ||||||
indices := []string{} | ||||||
for _, v := range info.DataStreams[0].Indices { | ||||||
indices = append(indices, v.IndexName) | ||||||
} | ||||||
return errors.New(fmt.Sprintf("no index matches the filter regexp; filter=%s indices=%s", c.indexName, indices)) | ||||||
} | ||||||
|
||||||
mappingRes, err := es.Indices.GetMapping().Index(backingIndex).Do(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("Error getting mapping: %w", err) | ||||||
} | ||||||
|
||||||
indexMappingRecord, ok := mappingRes[backingIndex] | ||||||
if !ok { | ||||||
return errors.New(fmt.Sprintf("Backing index %s not found in mapping", backingIndex)) | ||||||
} | ||||||
|
||||||
if err := c.checkFn(indexMappingRecord.Mappings); err != nil { | ||||||
return fmt.Errorf("mapping check failed: %w", err) | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
type checkFieldExistsInDocsStep struct { | ||||||
dataStreamName string | ||||||
fieldName string | ||||||
// checkFn can be used to customize the check this step performs on the returned | ||||||
// doc count. By default it checks if the vlaue is greater than 0. | ||||||
checkFn func(i int64) bool | ||||||
} | ||||||
|
||||||
func (s checkFieldExistsInDocsStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) { | ||||||
t.Logf("------ check data stream docs with field %s in %s ------", s.fieldName, e.currentVersion()) | ||||||
count, err := getDocCountWithField(ctx, e.esc.TypedClient(), s.dataStreamName, s.fieldName) | ||||||
if err != nil { | ||||||
t.Log(err) | ||||||
t.Fail() | ||||||
} | ||||||
t.Logf("documents in data stream %s with '%s': %d\n", s.dataStreamName, s.fieldName, count) | ||||||
|
||||||
if s.checkFn == nil { | ||||||
s.checkFn = func(i int64) bool { return i > 0 } | ||||||
} | ||||||
|
||||||
if !s.checkFn(count) { | ||||||
t.Log("checkFieldExistsInDocsStep: check function failed") | ||||||
t.Fail() | ||||||
return | ||||||
} | ||||||
t.Log("check successful") | ||||||
} | ||||||
|
||||||
func getDocCountWithField(ctx context.Context, c *elasticsearch.TypedClient, dataStreamName, fieldName string) (int64, error) { | ||||||
req := &count.Request{ | ||||||
Query: &types.Query{ | ||||||
Exists: &types.ExistsQuery{ | ||||||
Field: fieldName, | ||||||
}, | ||||||
}, | ||||||
} | ||||||
resp, err := c.Core.Count(). | ||||||
Index(dataStreamName). | ||||||
Request(req). | ||||||
Do(ctx) | ||||||
if err != nil { | ||||||
return 0, fmt.Errorf("cannot retrieve doc count: %w", err) | ||||||
} | ||||||
return resp.Count, nil | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.