-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Run GeoIp YAML tests in multi-project cluster and fix bug discovered by tests #131521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
17b4038
1f9ab3c
17a144c
e3a9314
4ed662e
20955aa
5f651a3
d4a08b3
92935c8
348494c
ee11eba
9accd8e
0fdf1b0
d2c338c
c611574
31fca85
4953636
892d97d
e281e0a
24cd03c
14f0284
1287fdc
743602e
7089827
f02df24
ce80bbb
d0e8e31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.ingest.geoip; | ||
|
||
import fixture.geoip.GeoIpHttpFixture; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.Name; | ||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; | ||
|
||
import org.apache.http.entity.ByteArrayEntity; | ||
import org.apache.http.entity.ContentType; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.core.Booleans; | ||
import org.elasticsearch.multiproject.test.MultipleProjectsClientYamlSuiteTestCase; | ||
import org.elasticsearch.test.cluster.ElasticsearchCluster; | ||
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; | ||
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; | ||
import org.elasticsearch.xcontent.XContentBuilder; | ||
import org.elasticsearch.xcontent.json.JsonXContent; | ||
import org.junit.Before; | ||
import org.junit.ClassRule; | ||
import org.junit.rules.RuleChain; | ||
import org.junit.rules.TestRule; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.hamcrest.Matchers.containsInAnyOrder; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.notNullValue; | ||
|
||
public class IngestGeoIpClientMultiProjectYamlTestSuiteIT extends MultipleProjectsClientYamlSuiteTestCase { | ||
|
||
private static final boolean useFixture = Booleans.parseBoolean(System.getProperty("geoip_use_service", "false")) == false; | ||
|
||
private static GeoIpHttpFixture fixture = new GeoIpHttpFixture(useFixture); | ||
|
||
private static ElasticsearchCluster cluster = ElasticsearchCluster.local() | ||
.module("reindex") | ||
.module("ingest-geoip") | ||
.systemProperty("ingest.geoip.downloader.enabled.default", "true") | ||
// sets the plain (geoip.elastic.co) downloader endpoint, which is used in these tests | ||
.setting("ingest.geoip.downloader.endpoint", () -> fixture.getAddress(), s -> useFixture) | ||
// also sets the enterprise downloader maxmind endpoint, to make sure we do not accidentally hit the real endpoint from tests | ||
// note: it's not important that the downloading actually work at this point -- the rest tests (so far) don't exercise | ||
// the downloading code because of license reasons -- but if they did, then it would be important that we're hitting a fixture | ||
.systemProperty("ingest.geoip.downloader.maxmind.endpoint.default", () -> fixture.getAddress(), s -> useFixture) | ||
.setting("test.multi_project.enabled", "true") | ||
.setting("xpack.license.self_generated.type", "trial") | ||
.user(USER, PASS) | ||
.build(); | ||
|
||
@ClassRule | ||
public static TestRule ruleChain = RuleChain.outerRule(fixture).around(cluster); | ||
|
||
@Override | ||
protected String getTestRestCluster() { | ||
return cluster.getHttpAddresses(); | ||
} | ||
|
||
public IngestGeoIpClientMultiProjectYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { | ||
super(testCandidate); | ||
} | ||
|
||
@ParametersFactory | ||
public static Iterable<Object[]> parameters() throws Exception { | ||
return ESClientYamlSuiteTestCase.createParameters(); | ||
} | ||
|
||
@Before | ||
public void waitForDatabases() throws Exception { | ||
putGeoipPipeline(); | ||
assertBusy(() -> { | ||
Request request = new Request("GET", "/_ingest/geoip/stats"); | ||
Map<String, Object> response = entityAsMap(client().performRequest(request)); | ||
// assert databases are downloaded | ||
Map<?, ?> downloadStats = (Map<?, ?>) response.get("stats"); | ||
assertThat(downloadStats.get("databases_count"), equalTo(4)); | ||
// assert databases are loaded to node | ||
Map<?, ?> nodes = (Map<?, ?>) response.get("nodes"); | ||
assertThat(nodes.size(), equalTo(1)); | ||
Map<?, ?> node = (Map<?, ?>) nodes.values().iterator().next(); | ||
List<?> databases = ((List<?>) node.get("databases")); | ||
assertThat(databases, notNullValue()); | ||
List<String> databaseNames = databases.stream().map(o -> (String) ((Map<?, ?>) o).get("name")).toList(); | ||
assertThat( | ||
databaseNames, | ||
containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb", "MyCustomGeoLite2-City.mmdb") | ||
); | ||
}, 10, TimeUnit.SECONDS); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the same as the method in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good point. But I was struggling to get methods from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, so I guess you might be able to declare a I dunno, maybe this isn't worth the effort WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Turns out we have this |
||
|
||
/** | ||
* This creates a pipeline with a geoip processor so that the GeoipDownloader will download its databases. | ||
* @throws IOException | ||
*/ | ||
private void putGeoipPipeline() throws IOException { | ||
final BytesReference bytes; | ||
try (XContentBuilder builder = JsonXContent.contentBuilder()) { | ||
builder.startObject(); | ||
{ | ||
builder.startArray("processors"); | ||
{ | ||
builder.startObject(); | ||
{ | ||
builder.startObject("geoip"); | ||
{ | ||
builder.field("field", "ip"); | ||
builder.field("target_field", "ip-city"); | ||
builder.field("database_file", "GeoLite2-City.mmdb"); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endArray(); | ||
} | ||
builder.endObject(); | ||
bytes = BytesReference.bytes(builder); | ||
} | ||
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/pipeline-with-geoip"); | ||
putPipelineRequest.setEntity(new ByteArrayEntity(bytes.array(), ContentType.APPLICATION_JSON)); | ||
client().performRequest(putPipelineRequest); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about commoning up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reused code from |
||
|
||
} |
nielsbauman marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -289,103 +289,102 @@ void checkDatabases(ClusterState state) { | |||||
|
||||||
// Optimization: only load the .geoip_databases for projects that are allocated to this node | ||||||
for (ProjectMetadata projectMetadata : state.metadata().projects().values()) { | ||||||
ProjectId projectId = projectMetadata.id(); | ||||||
checkDatabases(state, projectMetadata); | ||||||
} | ||||||
samxbr marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
|
||||||
PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE); | ||||||
if (persistentTasks == null) { | ||||||
logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId); | ||||||
continue; | ||||||
} | ||||||
void checkDatabases(ClusterState state, ProjectMetadata projectMetadata) { | ||||||
ProjectId projectId = projectMetadata.id(); | ||||||
PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE); | ||||||
if (persistentTasks == null) { | ||||||
logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId); | ||||||
return; | ||||||
} | ||||||
|
||||||
IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); | ||||||
if (databasesAbstraction == null) { | ||||||
logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId); | ||||||
IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); | ||||||
if (databasesAbstraction == null) { | ||||||
logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId); | ||||||
return; | ||||||
} else { | ||||||
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index | ||||||
Index databasesIndex = databasesAbstraction.getWriteIndex(); | ||||||
IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex); | ||||||
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { | ||||||
logger.trace( | ||||||
"Not checking databases because geoip databases index does not have all active primary shards for" + " project [{}]", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit:
Suggested change
|
||||||
projectId | ||||||
); | ||||||
return; | ||||||
samxbr marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} else { | ||||||
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index | ||||||
Index databasesIndex = databasesAbstraction.getWriteIndex(); | ||||||
IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex); | ||||||
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { | ||||||
logger.trace( | ||||||
"Not checking databases because geoip databases index does not have all active primary shards for" | ||||||
+ " project [{}]", | ||||||
projectId | ||||||
); | ||||||
return; | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
// we'll consult each of the geoip downloaders to build up a list of database metadatas to work with | ||||||
List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>(); | ||||||
// we'll consult each of the geoip downloaders to build up a list of database metadatas to work with | ||||||
List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>(); | ||||||
|
||||||
// process the geoip task state for the (ordinary) geoip downloader | ||||||
{ | ||||||
GeoIpTaskState taskState = getGeoIpTaskState( | ||||||
projectMetadata, | ||||||
getTaskId(projectId, projectResolver.supportsMultipleProjects()) | ||||||
); | ||||||
if (taskState == null) { | ||||||
// Note: an empty state will purge stale entries in databases map | ||||||
taskState = GeoIpTaskState.EMPTY; | ||||||
} | ||||||
validMetadatas.addAll( | ||||||
taskState.getDatabases() | ||||||
.entrySet() | ||||||
.stream() | ||||||
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) | ||||||
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) | ||||||
.toList() | ||||||
); | ||||||
// process the geoip task state for the (ordinary) geoip downloader | ||||||
{ | ||||||
GeoIpTaskState taskState = getGeoIpTaskState(projectMetadata, getTaskId(projectId, projectResolver.supportsMultipleProjects())); | ||||||
if (taskState == null) { | ||||||
// Note: an empty state will purge stale entries in databases map | ||||||
taskState = GeoIpTaskState.EMPTY; | ||||||
} | ||||||
validMetadatas.addAll( | ||||||
taskState.getDatabases() | ||||||
.entrySet() | ||||||
.stream() | ||||||
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) | ||||||
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) | ||||||
.toList() | ||||||
); | ||||||
} | ||||||
|
||||||
// process the geoip task state for the enterprise geoip downloader | ||||||
{ | ||||||
EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state); | ||||||
if (taskState == null) { | ||||||
// Note: an empty state will purge stale entries in databases map | ||||||
taskState = EnterpriseGeoIpTaskState.EMPTY; | ||||||
} | ||||||
validMetadatas.addAll( | ||||||
taskState.getDatabases() | ||||||
.entrySet() | ||||||
.stream() | ||||||
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) | ||||||
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) | ||||||
.toList() | ||||||
); | ||||||
// process the geoip task state for the enterprise geoip downloader | ||||||
{ | ||||||
EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state); | ||||||
if (taskState == null) { | ||||||
// Note: an empty state will purge stale entries in databases map | ||||||
taskState = EnterpriseGeoIpTaskState.EMPTY; | ||||||
} | ||||||
validMetadatas.addAll( | ||||||
taskState.getDatabases() | ||||||
.entrySet() | ||||||
.stream() | ||||||
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) | ||||||
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) | ||||||
.toList() | ||||||
); | ||||||
} | ||||||
|
||||||
// run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task | ||||||
// has downloaded a new version of the databases | ||||||
validMetadatas.forEach(e -> { | ||||||
String name = e.v1(); | ||||||
GeoIpTaskState.Metadata metadata = e.v2(); | ||||||
DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name); | ||||||
String remoteMd5 = metadata.md5(); | ||||||
String localMd5 = reference != null ? reference.getMd5() : null; | ||||||
if (Objects.equals(localMd5, remoteMd5)) { | ||||||
logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5); | ||||||
return; | ||||||
} | ||||||
// run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task | ||||||
// has downloaded a new version of the databases | ||||||
validMetadatas.forEach(e -> { | ||||||
String name = e.v1(); | ||||||
GeoIpTaskState.Metadata metadata = e.v2(); | ||||||
DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name); | ||||||
String remoteMd5 = metadata.md5(); | ||||||
String localMd5 = reference != null ? reference.getMd5() : null; | ||||||
if (Objects.equals(localMd5, remoteMd5)) { | ||||||
logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5); | ||||||
return; | ||||||
} | ||||||
|
||||||
try { | ||||||
retrieveAndUpdateDatabase(projectId, name, metadata); | ||||||
} catch (Exception ex) { | ||||||
logger.error(() -> "failed to retrieve database [" + name + "]", ex); | ||||||
} | ||||||
}); | ||||||
|
||||||
// TODO perhaps we need to handle the license flap persistent task state better than we do | ||||||
// i think the ideal end state is that we *do not* drop the files that the enterprise downloader | ||||||
// handled if they fall out -- which means we need to track that in the databases map itself | ||||||
|
||||||
// start with the list of all databases we currently know about in this service, | ||||||
// then drop the ones that didn't check out as valid from the task states | ||||||
if (databases.containsKey(projectId)) { | ||||||
Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet()); | ||||||
staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet())); | ||||||
removeStaleEntries(projectId, staleDatabases); | ||||||
try { | ||||||
retrieveAndUpdateDatabase(projectId, name, metadata); | ||||||
} catch (Exception ex) { | ||||||
logger.error(() -> "failed to retrieve database [" + name + "]", ex); | ||||||
} | ||||||
}); | ||||||
|
||||||
// TODO perhaps we need to handle the license flap persistent task state better than we do | ||||||
// i think the ideal end state is that we *do not* drop the files that the enterprise downloader | ||||||
// handled if they fall out -- which means we need to track that in the databases map itself | ||||||
|
||||||
// start with the list of all databases we currently know about in this service, | ||||||
// then drop the ones that didn't check out as valid from the task states | ||||||
if (databases.containsKey(projectId)) { | ||||||
Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet()); | ||||||
staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet())); | ||||||
removeStaleEntries(projectId, staleDatabases); | ||||||
} | ||||||
} | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm probably missing something, but I'm having trouble identifying why we need this dedicated YAML test suite. Can you explain why we can't just run the regular YAML suite in MP mode (i.e.
tests.multi_project.enabled=true
)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we'd want to run the yaml tests in both MP enabled and disabled mode, is there a way to do that with the existing test suite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree on that
Once ES-12094 gets implemented, that should be trivial. However, I'm not familiar enough with Gradle to know if there's a simple way to do that right now. Perhaps you can ask in
#es-delivery
- mainly for an answer to this question and potentially also a status update on the ticket itself.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing the issue. Until that is implemented, I think keeping the MP YAML tests in
qa/multi-project
Gradle project together with other MP Java rest tests is more organized.Also asked in the linked Slack channel to see if there's better way of doing this now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for asking in the channel. If we don't find a better way to do it, I think I'd be more in favor of not running the MP YAML tests in the CI yet. I don't think these changes are that small and especially if we're combining them with the fix itself, it'll be annoying to revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure, leaving the MP YAML tests out of CI seems missing test coverage, considering the amount of code change have been made to make GeoIP project-aware (especially there's already a bug found from these tests). Although MP is not in use yet, so probably not a huge due if there's bug.
Why do we want to revert? If these tests fail in CI then they should be muted, and we can fix them instead of reverting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we're missing test coverage if we don't run this GeoIP YAML suite in MP mode, but there are other suites that we're not covering yet either, and, most importantly, we're not close to running any of this in production. So, I think we can live without coverage a little longer.
Once the Jira ticket I linked gets implemented, all these changes become redundant, and we can just do something like
or whatever the gradle lines will look like to make a YAML test suite run in MP mode as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, I have added a
FixForMultiProject
as reminder. For this specific PR, I think there's value including the tests suite as it validates the bug has been fixed. We can always remove it after the linked ticket is implemented.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the annotation. I still don't really think it's worth merging these changes, but I also don't think it's worth discussing this any longer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Niels, really appreciate your thought on this. I also don't think it's worth spending more time discussing on this. I agree that this YAML test change is not effortless, we can defer making similar changes for other modules until ES-12094 is implemented.