Skip to content

Commit 8b10215

Browse files
rajiv-kvdblock
andauthored
Enabling term version check on local state for all ClusterManager Read Transport Actions (opensearch-project#14273) (opensearch-project#14869)
* enabling term version check on local state for all admin read actions Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com> Signed-off-by: Daniel (dB.) Doubrovkine <dblock@amazon.com> Co-authored-by: Daniel (dB.) Doubrovkine <dblock@amazon.com>
1 parent 788a7fe commit 8b10215

23 files changed

+382
-48
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
2727
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
2828
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
29+
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
2930

3031
### Dependencies
3132
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))

server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@
8484
import org.opensearch.action.search.SearchResponse;
8585
import org.opensearch.action.search.SearchTransportService;
8686
import org.opensearch.action.search.SearchType;
87+
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
88+
import org.opensearch.action.support.clustermanager.term.GetTermVersionRequest;
8789
import org.opensearch.action.support.replication.TransportReplicationActionTests;
8890
import org.opensearch.action.termvectors.MultiTermVectorsAction;
8991
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
@@ -195,6 +197,7 @@ public void cleanUp() {
195197
}
196198

197199
public void testGetFieldMappings() {
200+
198201
String getFieldMappingsShardAction = GetFieldMappingsAction.NAME + "[index][s]";
199202
interceptTransportActions(getFieldMappingsShardAction);
200203

@@ -545,13 +548,14 @@ public void testDeleteIndex() {
545548
}
546549

547550
public void testGetMappings() {
548-
interceptTransportActions(GetMappingsAction.NAME);
549-
551+
interceptTransportActions(GetTermVersionAction.NAME, GetMappingsAction.NAME);
550552
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases());
551553
internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();
552554

553555
clearInterceptedActions();
554-
assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);
556+
557+
assertActionInvocation(GetTermVersionAction.NAME, GetTermVersionRequest.class);
558+
assertNoActionInvocation(GetMappingsAction.NAME);
555559
}
556560

557561
public void testPutMapping() {
@@ -565,8 +569,8 @@ public void testPutMapping() {
565569
}
566570

567571
public void testGetSettings() {
568-
interceptTransportActions(GetSettingsAction.NAME);
569572

573+
interceptTransportActions(GetSettingsAction.NAME);
570574
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases());
571575
internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet();
572576

@@ -662,6 +666,21 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op
662666
}
663667
}
664668

669+
private static void assertActionInvocation(String action, Class<? extends TransportRequest> requestClass) {
670+
List<TransportRequest> requests = consumeTransportRequests(action);
671+
assertFalse(requests.isEmpty());
672+
for (TransportRequest internalRequest : requests) {
673+
assertTrue(internalRequest.getClass() == requestClass);
674+
}
675+
}
676+
677+
private static void assertNoActionInvocation(String... actions) {
678+
for (String action : actions) {
679+
List<TransportRequest> requests = consumeTransportRequests(action);
680+
assertTrue(requests.isEmpty());
681+
}
682+
}
683+
665684
private static void assertIndicesSubset(List<String> indices, String... actions) {
666685
// indices returned by each bulk shard request need to be a subset of the original indices
667686
for (String action : actions) {
@@ -781,7 +800,6 @@ public List<TransportInterceptor> getTransportInterceptors(
781800
}
782801

783802
private final Set<String> actions = new HashSet<>();
784-
785803
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
786804

787805
@Override
@@ -831,6 +849,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
831849
}
832850
}
833851
requestHandler.messageReceived(request, channel, task);
852+
834853
}
835854
}
836855
}

server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,19 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
1414
import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse;
15+
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
16+
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
1517
import org.opensearch.client.node.NodeClient;
18+
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
19+
import org.opensearch.cluster.service.ClusterService;
1620
import org.opensearch.common.settings.Settings;
1721
import org.opensearch.common.unit.TimeValue;
1822
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
1923
import org.opensearch.core.rest.RestStatus;
2024
import org.opensearch.node.IoUsageStats;
2125
import org.opensearch.node.ResourceUsageCollectorService;
2226
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
27+
import org.opensearch.plugins.Plugin;
2328
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
2429
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
2530
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
@@ -29,9 +34,13 @@
2934
import org.opensearch.rest.action.admin.indices.RestGetAliasesAction;
3035
import org.opensearch.test.OpenSearchIntegTestCase;
3136
import org.opensearch.test.rest.FakeRestRequest;
37+
import org.opensearch.test.transport.MockTransportService;
38+
import org.opensearch.transport.TransportService;
3239
import org.junit.Before;
3340

41+
import java.util.Collection;
3442
import java.util.HashMap;
43+
import java.util.List;
3544
import java.util.Map;
3645
import java.util.concurrent.CountDownLatch;
3746
import java.util.concurrent.atomic.AtomicReference;
@@ -62,6 +71,10 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {
6271
.put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
6372
.build();
6473

74+
protected Collection<Class<? extends Plugin>> nodePlugins() {
75+
return List.of(MockTransportService.TestPlugin.class);
76+
}
77+
6578
@Before
6679
public void init() {
6780
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
@@ -79,15 +92,34 @@ public void init() {
7992

8093
// Enable admission control
8194
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet();
95+
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(
96+
TransportService.class,
97+
clusterManagerNode
98+
);
99+
100+
// Force always fetch from ClusterManager
101+
ClusterService clusterService = internalCluster().clusterService();
102+
GetTermVersionResponse oosTerm = new GetTermVersionResponse(
103+
new ClusterStateTermVersion(
104+
clusterService.state().getClusterName(),
105+
clusterService.state().metadata().clusterUUID(),
106+
clusterService.state().term() - 1,
107+
clusterService.state().version() - 1
108+
)
109+
);
110+
primaryService.addRequestHandlingBehavior(
111+
GetTermVersionAction.NAME,
112+
(handler, request, channel, task) -> channel.sendResponse(oosTerm)
113+
);
82114
}
83115

84116
public void testAdmissionControlEnforced() throws Exception {
85117
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));
86118

87119
// Write API on ClusterManager
88120
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));
89-
90121
// Read API on ClusterManager
122+
91123
GetAliasesRequest aliasesRequest = new GetAliasesRequest();
92124
aliasesRequest.aliases("alias1");
93125
try {

server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public TransportGetDecommissionStateAction(
4848
threadPool,
4949
actionFilters,
5050
GetDecommissionStateRequest::new,
51-
indexNameExpressionResolver
51+
indexNameExpressionResolver,
52+
true
5253
);
5354
}
5455

server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,4 +534,9 @@ private ClusterHealthResponse clusterHealth(
534534
pendingTaskTimeInQueue
535535
);
536536
}
537+
538+
@Override
539+
protected boolean localExecuteSupportedByAction() {
540+
return false;
541+
}
537542
}

server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ public TransportGetRepositoriesAction(
7979
threadPool,
8080
actionFilters,
8181
GetRepositoriesRequest::new,
82-
indexNameExpressionResolver
82+
indexNameExpressionResolver,
83+
true
8384
);
8485
}
8586

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public TransportClusterSearchShardsAction(
8585
threadPool,
8686
actionFilters,
8787
ClusterSearchShardsRequest::new,
88-
indexNameExpressionResolver
88+
indexNameExpressionResolver,
89+
true
8990
);
9091
this.indicesService = indicesService;
9192
}

server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public TransportGetWeightedRoutingAction(
5555
threadPool,
5656
actionFilters,
5757
ClusterGetWeightedRoutingRequest::new,
58-
indexNameExpressionResolver
58+
indexNameExpressionResolver,
59+
true
5960
);
6061
this.weightedRoutingService = weightedRoutingService;
6162
}

server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public TransportClusterStateAction(
9292
ClusterStateRequest::new,
9393
indexNameExpressionResolver
9494
);
95+
this.localExecuteSupported = true;
9596
}
9697

9798
@Override
@@ -233,9 +234,4 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi
233234

234235
return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
235236
}
236-
237-
@Override
238-
protected boolean localExecuteSupportedByAction() {
239-
return true;
240-
}
241237
}

server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public TransportGetStoredScriptAction(
7373
threadPool,
7474
actionFilters,
7575
GetStoredScriptRequest::new,
76-
indexNameExpressionResolver
76+
indexNameExpressionResolver,
77+
true
7778
);
7879
this.scriptService = scriptService;
7980
}

server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,9 @@ protected void clusterManagerOperation(
110110
logger.trace("done fetching pending tasks from cluster service");
111111
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
112112
}
113+
114+
@Override
115+
protected boolean localExecuteSupportedByAction() {
116+
return false;
117+
}
113118
}

server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public TransportGetAliasesAction(
8686
threadPool,
8787
actionFilters,
8888
GetAliasesRequest::new,
89-
indexNameExpressionResolver
89+
indexNameExpressionResolver,
90+
true
9091
);
9192
this.systemIndices = systemIndices;
9293
}

server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public TransportIndicesExistsAction(
7171
threadPool,
7272
actionFilters,
7373
IndicesExistsRequest::new,
74-
indexNameExpressionResolver
74+
indexNameExpressionResolver,
75+
true
7576
);
7677
}
7778

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public TransportIndicesShardStoresAction(
105105
threadPool,
106106
actionFilters,
107107
IndicesShardStoresRequest::new,
108-
indexNameExpressionResolver
108+
indexNameExpressionResolver,
109+
true
109110
);
110111
this.listShardStoresInfo = listShardStoresInfo;
111112
}

server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public TransportGetComponentTemplateAction(
7676
threadPool,
7777
actionFilters,
7878
GetComponentTemplateAction.Request::new,
79-
indexNameExpressionResolver
79+
indexNameExpressionResolver,
80+
true
8081
);
8182
}
8283

server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public TransportGetComposableIndexTemplateAction(
7676
threadPool,
7777
actionFilters,
7878
GetComposableIndexTemplateAction.Request::new,
79-
indexNameExpressionResolver
79+
indexNameExpressionResolver,
80+
true
8081
);
8182
}
8283

server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public TransportGetIndexTemplatesAction(
7676
threadPool,
7777
actionFilters,
7878
GetIndexTemplatesRequest::new,
79-
indexNameExpressionResolver
79+
indexNameExpressionResolver,
80+
true
8081
);
8182
}
8283

server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public GetPipelineTransportAction(
7070
threadPool,
7171
actionFilters,
7272
GetPipelineRequest::new,
73-
indexNameExpressionResolver
73+
indexNameExpressionResolver,
74+
true
7475
);
7576
}
7677

server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public GetSearchPipelineTransportAction(
4848
threadPool,
4949
actionFilters,
5050
GetSearchPipelineRequest::new,
51-
indexNameExpressionResolver
51+
indexNameExpressionResolver,
52+
true
5253
);
5354
}
5455

0 commit comments

Comments
 (0)