Skip to content

Commit 5e9da64

Browse files
committed
Enabled default throttling for all tasks submitted to cluster manager (opensearch-project#17711)
Signed-off-by: Manik Garg <gargmanik1317@gmail.com> (cherry picked from commit 6ce0628)
1 parent 134b6a5 commit 5e9da64

30 files changed

+317
-234
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
77
### Added
88
- Add multi-threaded writer support in pull-based ingestion ([#17912](https://github.com/opensearch-project/OpenSearch/pull/17912))
99
- Unset discovery nodes for every transport node actions request ([#17682](https://github.com/opensearch-project/OpenSearch/pull/17682))
10+
- Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711))
1011

1112
### Changed
1213
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/WorkloadGroupPersistenceService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@
4141
import java.util.stream.Collectors;
4242

4343
import static org.opensearch.cluster.metadata.WorkloadGroup.updateExistingWorkloadGroup;
44+
import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_QUERY_GROUP;
45+
import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_QUERY_GROUP;
46+
import static org.opensearch.cluster.service.ClusterManagerTask.UPDATE_QUERY_GROUP;
4447

4548
/**
4649
* This class defines the functions for WorkloadGroup persistence
4750
*/
4851
public class WorkloadGroupPersistenceService {
4952
static final String SOURCE = "query-group-persistence-service";
50-
private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group";
51-
private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group";
52-
private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group";
5353
private static final Logger logger = LogManager.getLogger(WorkloadGroupPersistenceService.class);
5454
/**
5555
* max WorkloadGroup count setting name
@@ -94,9 +94,9 @@ public WorkloadGroupPersistenceService(
9494
final ClusterSettings clusterSettings
9595
) {
9696
this.clusterService = clusterService;
97-
this.createWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_THROTTLING_KEY, true);
98-
this.deleteWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_THROTTLING_KEY, true);
99-
this.updateWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true);
97+
this.createWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP, true);
98+
this.deleteWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP, true);
99+
this.updateWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP, true);
100100
setMaxWorkloadGroupCount(MAX_QUERY_GROUP_COUNT.get(settings));
101101
clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxWorkloadGroupCount);
102102
}

server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.opensearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
5454
import org.opensearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
5555
import org.opensearch.cluster.routing.allocation.command.AllocationCommand;
56-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
5756
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
5857
import org.opensearch.cluster.service.ClusterService;
5958
import org.opensearch.common.Priority;
@@ -70,6 +69,8 @@
7069
import java.util.List;
7170
import java.util.Map;
7271

72+
import static org.opensearch.cluster.service.ClusterManagerTask.CLUSTER_REROUTE_API;
73+
7374
/**
7475
* Transport action for rerouting cluster allocation commands
7576
*
@@ -102,7 +103,7 @@ public TransportClusterRerouteAction(
102103
);
103104
this.allocationService = allocationService;
104105
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
105-
clusterRerouteTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_REROUTE_API_KEY, true);
106+
clusterRerouteTaskKey = clusterService.registerClusterManagerTask(CLUSTER_REROUTE_API, true);
106107
}
107108

108109
@Override

server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.opensearch.cluster.node.DiscoveryNode;
4848
import org.opensearch.cluster.node.DiscoveryNodes;
4949
import org.opensearch.cluster.routing.allocation.AllocationService;
50-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
5150
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
5251
import org.opensearch.cluster.service.ClusterService;
5352
import org.opensearch.common.Nullable;
@@ -65,6 +64,7 @@
6564
import java.io.IOException;
6665

6766
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
67+
import static org.opensearch.cluster.service.ClusterManagerTask.CLUSTER_UPDATE_SETTINGS;
6868
import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration;
6969

7070
/**
@@ -108,7 +108,7 @@ public TransportClusterUpdateSettingsAction(
108108
this.clusterSettings = clusterSettings;
109109

110110
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
111-
clusterUpdateSettingTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_UPDATE_SETTINGS_KEY, true);
111+
clusterUpdateSettingTaskKey = clusterService.registerClusterManagerTask(CLUSTER_UPDATE_SETTINGS, true);
112112

113113
}
114114

server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.opensearch.cluster.block.ClusterBlockException;
4040
import org.opensearch.cluster.block.ClusterBlockLevel;
4141
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
42-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
4342
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
4443
import org.opensearch.cluster.service.ClusterService;
4544
import org.opensearch.common.inject.Inject;
@@ -51,6 +50,8 @@
5150

5251
import java.io.IOException;
5352

53+
import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_SCRIPT;
54+
5455
/**
5556
* Transport action for deleting stored script
5657
*
@@ -81,7 +82,7 @@ public TransportDeleteStoredScriptAction(
8182
);
8283
this.scriptService = scriptService;
8384
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
84-
deleteScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SCRIPT_KEY, true);
85+
deleteScriptTaskKey = clusterService.registerClusterManagerTask(DELETE_SCRIPT, true);
8586
}
8687

8788
@Override

server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.opensearch.cluster.block.ClusterBlockException;
4040
import org.opensearch.cluster.block.ClusterBlockLevel;
4141
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
42-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
4342
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
4443
import org.opensearch.cluster.service.ClusterService;
4544
import org.opensearch.common.inject.Inject;
@@ -51,6 +50,8 @@
5150

5251
import java.io.IOException;
5352

53+
import static org.opensearch.cluster.service.ClusterManagerTask.PUT_SCRIPT;
54+
5455
/**
5556
* Transport action for putting stored script
5657
*
@@ -81,7 +82,7 @@ public TransportPutStoredScriptAction(
8182
);
8283
this.scriptService = scriptService;
8384
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
84-
putScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SCRIPT_KEY, true);
85+
putScriptTaskKey = clusterService.registerClusterManagerTask(PUT_SCRIPT, true);
8586
}
8687

8788
@Override

server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.opensearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
5050
import org.opensearch.cluster.metadata.MetadataCreateIndexService;
5151
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
52-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
5352
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
5453
import org.opensearch.cluster.service.ClusterService;
5554
import org.opensearch.common.Priority;
@@ -62,6 +61,8 @@
6261
import java.io.IOException;
6362
import java.util.concurrent.atomic.AtomicReference;
6463

64+
import static org.opensearch.cluster.service.ClusterManagerTask.AUTO_CREATE;
65+
6566
/**
6667
* Api that auto creates an index or data stream that originate from requests that write into an index that doesn't yet exist.
6768
*
@@ -104,7 +105,7 @@ public TransportAction(
104105
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
105106

106107
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
107-
autoCreateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.AUTO_CREATE_KEY, true);
108+
autoCreateTaskKey = clusterService.registerClusterManagerTask(AUTO_CREATE, true);
108109
}
109110

110111
@Override

server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.opensearch.cluster.metadata.IndexMetadata;
5252
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
5353
import org.opensearch.cluster.metadata.Metadata;
54-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
5554
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
5655
import org.opensearch.cluster.service.ClusterService;
5756
import org.opensearch.common.inject.Inject;
@@ -67,6 +66,8 @@
6766
import java.util.List;
6867
import java.util.stream.Collectors;
6968

69+
import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_DANGLING_INDEX;
70+
7071
/**
7172
* Implements the deletion of a dangling index. When handling a {@link DeleteDanglingIndexAction},
7273
* this class first checks that such a dangling index exists. It then submits a cluster state update
@@ -105,7 +106,7 @@ public TransportDeleteDanglingIndexAction(
105106
this.settings = settings;
106107
this.nodeClient = nodeClient;
107108
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
108-
deleteDanglingIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_DANGLING_INDEX_KEY, true);
109+
deleteDanglingIndexTaskKey = clusterService.registerClusterManagerTask(DELETE_DANGLING_INDEX, true);
109110
}
110111

111112
@Override

server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
5050
import org.opensearch.cluster.metadata.Metadata;
5151
import org.opensearch.cluster.metadata.MetadataDeleteIndexService;
52-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
5352
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
5453
import org.opensearch.cluster.service.ClusterService;
5554
import org.opensearch.common.Priority;
@@ -75,6 +74,7 @@
7574
import java.util.Set;
7675

7776
import static org.opensearch.action.ValidateActions.addValidationError;
77+
import static org.opensearch.cluster.service.ClusterManagerTask.REMOVE_DATA_STREAM;
7878

7979
/**
8080
* Transport action for deleting a datastream
@@ -186,7 +186,7 @@ public TransportAction(
186186
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
187187
this.deleteIndexService = deleteIndexService;
188188
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
189-
removeDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_DATA_STREAM_KEY, true);
189+
removeDataStreamTaskKey = clusterService.registerClusterManagerTask(REMOVE_DATA_STREAM, true);
190190
}
191191

192192
@Override

server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.opensearch.cluster.metadata.IndexMetadata;
4848
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
4949
import org.opensearch.cluster.metadata.Metadata;
50-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
5150
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
5251
import org.opensearch.cluster.service.ClusterService;
5352
import org.opensearch.common.Nullable;
@@ -71,6 +70,8 @@
7170
import java.util.Optional;
7271
import java.util.stream.Collectors;
7372

73+
import static org.opensearch.cluster.service.ClusterManagerTask.ROLLOVER_INDEX;
74+
7475
/**
7576
* Main class to swap the index pointed to by an alias, given some conditions
7677
*
@@ -106,7 +107,7 @@ public TransportRolloverAction(
106107
this.client = client;
107108
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
108109
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
109-
rolloverIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.ROLLOVER_INDEX_KEY, true);
110+
rolloverIndexTaskKey = clusterService.registerClusterManagerTask(ROLLOVER_INDEX, true);
110111
}
111112

112113
@Override

server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.opensearch.cluster.ClusterState;
4444
import org.opensearch.cluster.ack.ClusterStateUpdateRequest;
4545
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
46-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
4746
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
4847
import org.opensearch.cluster.service.ClusterService;
4948
import org.opensearch.common.Priority;
@@ -63,6 +62,8 @@
6362
import java.util.Map;
6463
import java.util.concurrent.atomic.AtomicReference;
6564

65+
import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_DATA_STREAM;
66+
6667
/**
6768
* Creates a data stream of metadata
6869
*
@@ -86,7 +87,7 @@ public MetadataCreateDataStreamService(
8687
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
8788
this.metadataCreateIndexService = metadataCreateIndexService;
8889
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
89-
createDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_DATA_STREAM_KEY, true);
90+
createDataStreamTaskKey = clusterService.registerClusterManagerTask(CREATE_DATA_STREAM, true);
9091
}
9192

9293
public void createDataStream(CreateDataStreamClusterStateUpdateRequest request, ActionListener<AcknowledgedResponse> finalListener) {

server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import org.opensearch.cluster.routing.ShardRoutingState;
6262
import org.opensearch.cluster.routing.allocation.AllocationService;
6363
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
64-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
6564
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
6665
import org.opensearch.cluster.service.ClusterService;
6766
import org.opensearch.common.Nullable;
@@ -156,6 +155,7 @@
156155
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
157156
import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findContextTemplateName;
158157
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
158+
import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_INDEX;
159159
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
160160
import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING;
161161
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
@@ -225,7 +225,7 @@ public MetadataCreateIndexService(
225225
this.awarenessReplicaBalance = awarenessReplicaBalance;
226226

227227
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
228-
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
228+
createIndexTaskKey = clusterService.registerClusterManagerTask(CREATE_INDEX, true);
229229
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
230230
remoteStoreCustomMetadataResolver = isRemoteDataAttributePresent(settings)
231231
? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier, repositoriesServiceSupplier, settings)

server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.opensearch.cluster.block.ClusterBlocks;
4343
import org.opensearch.cluster.routing.RoutingTable;
4444
import org.opensearch.cluster.routing.allocation.AllocationService;
45-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
4645
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
4746
import org.opensearch.cluster.service.ClusterService;
4847
import org.opensearch.common.Priority;
@@ -62,6 +61,8 @@
6261
import java.util.Map;
6362
import java.util.Set;
6463

64+
import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_INDEX;
65+
6566
/**
6667
* Deletes indices.
6768
*
@@ -84,7 +85,7 @@ public MetadataDeleteIndexService(Settings settings, ClusterService clusterServi
8485
this.allocationService = allocationService;
8586

8687
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
87-
deleteIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_INDEX_KEY, true);
88+
deleteIndexTaskKey = clusterService.registerClusterManagerTask(DELETE_INDEX, true);
8889

8990
}
9091

server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.opensearch.cluster.ClusterState;
3939
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
4040
import org.opensearch.cluster.metadata.AliasAction.NewAliasValidator;
41-
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
4241
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
4342
import org.opensearch.cluster.service.ClusterService;
4443
import org.opensearch.common.Priority;
@@ -62,6 +61,7 @@
6261
import java.util.function.Function;
6362

6463
import static java.util.Collections.emptyList;
64+
import static org.opensearch.cluster.service.ClusterManagerTask.INDEX_ALIASES;
6565
import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;
6666

6767
/**
@@ -97,7 +97,7 @@ public MetadataIndexAliasesService(
9797
this.xContentRegistry = xContentRegistry;
9898

9999
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
100-
indexAliasTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.INDEX_ALIASES_KEY, true);
100+
indexAliasTaskKey = clusterService.registerClusterManagerTask(INDEX_ALIASES, true);
101101

102102
}
103103

0 commit comments

Comments
 (0)