diff --git a/CHANGELOG.md b/CHANGELOG.md index e1a670a87b85d..6e9441ace1721 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782)) - Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039)) - Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/)) - +- Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711)) ### Changed - Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024)) diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/WorkloadGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/WorkloadGroupPersistenceService.java index 35a7dc7f3219f..8fd5fe5dfcfed 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/WorkloadGroupPersistenceService.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/WorkloadGroupPersistenceService.java @@ -41,15 +41,15 @@ import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.WorkloadGroup.updateExistingWorkloadGroup; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_QUERY_GROUP; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_QUERY_GROUP; +import static org.opensearch.cluster.service.ClusterManagerTask.UPDATE_QUERY_GROUP; /** * This class defines the functions for WorkloadGroup persistence */ public class WorkloadGroupPersistenceService { static final String SOURCE = "query-group-persistence-service"; - private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group"; - private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group"; - private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group"; private static final Logger logger = LogManager.getLogger(WorkloadGroupPersistenceService.class); /** * max WorkloadGroup count setting name @@ -94,9 +94,9 @@ public WorkloadGroupPersistenceService( final ClusterSettings clusterSettings ) { this.clusterService = clusterService; - this.createWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_THROTTLING_KEY, true); - this.deleteWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_THROTTLING_KEY, true); - this.updateWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true); + this.createWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP, true); + this.deleteWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP, true); + this.updateWorkloadGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP, true); setMaxWorkloadGroupCount(MAX_QUERY_GROUP_COUNT.get(settings)); clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxWorkloadGroupCount); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index 134583a56f489..adf5dfa9c3aa8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -53,7 +53,6 @@ import org.opensearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocationCommand; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -70,6 +69,8 @@ import java.util.List; import java.util.Map; +import static org.opensearch.cluster.service.ClusterManagerTask.CLUSTER_REROUTE_API; + /** * Transport action for rerouting cluster allocation commands * @@ -102,7 +103,7 @@ public TransportClusterRerouteAction( ); this.allocationService = allocationService; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - clusterRerouteTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_REROUTE_API_KEY, true); + clusterRerouteTaskKey = clusterService.registerClusterManagerTask(CLUSTER_REROUTE_API, true); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 60c04d5a620f8..e144eca62b477 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -47,7 +47,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -65,6 +64,7 @@ import java.io.IOException; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.service.ClusterManagerTask.CLUSTER_UPDATE_SETTINGS; import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration; /** @@ -108,7 +108,7 @@ public TransportClusterUpdateSettingsAction( this.clusterSettings = clusterSettings; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - clusterUpdateSettingTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_UPDATE_SETTINGS_KEY, true); + clusterUpdateSettingTaskKey = clusterService.registerClusterManagerTask(CLUSTER_UPDATE_SETTINGS, true); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java index 9688b6659a810..3e8a376f48791 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java @@ -39,7 +39,6 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -51,6 +50,8 @@ import java.io.IOException; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_SCRIPT; + /** * Transport action for deleting stored script * @@ -81,7 +82,7 @@ public TransportDeleteStoredScriptAction( ); this.scriptService = scriptService; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - deleteScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SCRIPT_KEY, true); + deleteScriptTaskKey = clusterService.registerClusterManagerTask(DELETE_SCRIPT, true); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java index eb2838cbef5c8..22dfaad19e46a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java @@ -39,7 +39,6 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -51,6 +50,8 @@ import java.io.IOException; +import static org.opensearch.cluster.service.ClusterManagerTask.PUT_SCRIPT; + /** * Transport action for putting stored script * @@ -81,7 +82,7 @@ public TransportPutStoredScriptAction( ); this.scriptService = scriptService; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - putScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SCRIPT_KEY, true); + putScriptTaskKey = clusterService.registerClusterManagerTask(PUT_SCRIPT, true); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java index 35a51bafb2c22..4638c420ddb4d 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java @@ -49,7 +49,6 @@ import org.opensearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest; import org.opensearch.cluster.metadata.MetadataCreateIndexService; import org.opensearch.cluster.metadata.MetadataIndexTemplateService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -62,6 +61,8 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; +import static org.opensearch.cluster.service.ClusterManagerTask.AUTO_CREATE; + /** * Api that auto creates an index or data stream that originate from requests that write into an index that doesn't yet exist. * @@ -104,7 +105,7 @@ public TransportAction( this.metadataCreateDataStreamService = metadataCreateDataStreamService; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - autoCreateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.AUTO_CREATE_KEY, true); + autoCreateTaskKey = clusterService.registerClusterManagerTask(AUTO_CREATE, true); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java b/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java index 71123e4b6c315..3ffaa1e7d7b4e 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java @@ -51,7 +51,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -67,6 +66,8 @@ import java.util.List; import java.util.stream.Collectors; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_DANGLING_INDEX; + /** * Implements the deletion of a dangling index. When handling a {@link DeleteDanglingIndexAction}, * this class first checks that such a dangling index exists. It then submits a cluster state update @@ -105,7 +106,7 @@ public TransportDeleteDanglingIndexAction( this.settings = settings; this.nodeClient = nodeClient; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - deleteDanglingIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_DANGLING_INDEX_KEY, true); + deleteDanglingIndexTaskKey = clusterService.registerClusterManagerTask(DELETE_DANGLING_INDEX, true); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java index 2212974985e3f..9db621b1a5367 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -49,7 +49,6 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataDeleteIndexService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -75,6 +74,7 @@ import java.util.Set; import static org.opensearch.action.ValidateActions.addValidationError; +import static org.opensearch.cluster.service.ClusterManagerTask.REMOVE_DATA_STREAM; /** * Transport action for deleting a datastream @@ -186,7 +186,7 @@ public TransportAction( super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); this.deleteIndexService = deleteIndexService; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - removeDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_DATA_STREAM_KEY, true); + removeDataStreamTaskKey = clusterService.registerClusterManagerTask(REMOVE_DATA_STREAM, true); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java index 3dd67057d5e16..3124484716706 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -47,7 +47,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -71,6 +70,8 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.opensearch.cluster.service.ClusterManagerTask.ROLLOVER_INDEX; + /** * Main class to swap the index pointed to by an alias, given some conditions * @@ -106,7 +107,7 @@ public TransportRolloverAction( this.client = client; this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - rolloverIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.ROLLOVER_INDEX_KEY, true); + rolloverIndexTaskKey = clusterService.registerClusterManagerTask(ROLLOVER_INDEX, true); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java index 95be62fb06dcc..5e7201a17a9eb 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -43,7 +43,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateRequest; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -63,6 +62,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_DATA_STREAM; + /** * Creates a data stream of metadata * @@ -86,7 +87,7 @@ public MetadataCreateDataStreamService( this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); this.metadataCreateIndexService = metadataCreateIndexService; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_DATA_STREAM_KEY, true); + createDataStreamTaskKey = clusterService.registerClusterManagerTask(CREATE_DATA_STREAM, true); } public void createDataStream(CreateDataStreamClusterStateUpdateRequest request, ActionListener finalListener) { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index f4cc14bee7c2e..d01d4fc8f3b80 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -61,7 +61,6 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -156,6 +155,7 @@ import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING; import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findContextTemplateName; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_INDEX; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; @@ -225,7 +225,7 @@ public MetadataCreateIndexService( this.awarenessReplicaBalance = awarenessReplicaBalance; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true); + createIndexTaskKey = clusterService.registerClusterManagerTask(CREATE_INDEX, true); Supplier minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion(); remoteStoreCustomMetadataResolver = isRemoteDataAttributePresent(settings) ? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier, repositoriesServiceSupplier, settings) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java index 5352a8a3fb994..6af71c7a5ba32 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java @@ -42,7 +42,6 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -62,6 +61,8 @@ import java.util.Map; import java.util.Set; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_INDEX; + /** * Deletes indices. * @@ -84,7 +85,7 @@ public MetadataDeleteIndexService(Settings settings, ClusterService clusterServi this.allocationService = allocationService; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - deleteIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_INDEX_KEY, true); + deleteIndexTaskKey = clusterService.registerClusterManagerTask(DELETE_INDEX, true); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java index 96ba3d60ce9a6..b0bd6d145bfac 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java @@ -38,7 +38,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.metadata.AliasAction.NewAliasValidator; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -62,6 +61,7 @@ import java.util.function.Function; import static java.util.Collections.emptyList; +import static org.opensearch.cluster.service.ClusterManagerTask.INDEX_ALIASES; import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED; /** @@ -97,7 +97,7 @@ public MetadataIndexAliasesService( this.xContentRegistry = xContentRegistry; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - indexAliasTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.INDEX_ALIASES_KEY, true); + indexAliasTaskKey = clusterService.registerClusterManagerTask(INDEX_ALIASES, true); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java index de106d14c6fd9..263c491df1c82 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java @@ -45,7 +45,6 @@ import org.opensearch.cluster.applicationtemplates.ClusterStateSystemTemplateLoader; import org.opensearch.cluster.applicationtemplates.SystemTemplateMetadata; import org.opensearch.cluster.applicationtemplates.SystemTemplatesService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -104,6 +103,12 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateIndexTotalPrimaryShardsPerNodeSetting; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogFlushIntervalSettingsForCompositeIndex; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_COMPONENT_TEMPLATE; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_INDEX_TEMPLATE; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_INDEX_TEMPLATE_V2; +import static org.opensearch.cluster.service.ClusterManagerTask.REMOVE_COMPONENT_TEMPLATE; +import static org.opensearch.cluster.service.ClusterManagerTask.REMOVE_INDEX_TEMPLATE; +import static org.opensearch.cluster.service.ClusterManagerTask.REMOVE_INDEX_TEMPLATE_V2; import static org.opensearch.common.util.concurrent.ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME; import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED; @@ -148,18 +153,12 @@ public MetadataIndexTemplateService( this.threadPool = threadPool; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createIndexTemplateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_TEMPLATE_KEY, true); - createIndexTemplateV2TaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_TEMPLATE_V2_KEY, true); - removeIndexTemplateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_INDEX_TEMPLATE_KEY, true); - removeIndexTemplateV2TaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_INDEX_TEMPLATE_V2_KEY, true); - createComponentTemplateTaskKey = clusterService.registerClusterManagerTask( - ClusterManagerTaskKeys.CREATE_COMPONENT_TEMPLATE_KEY, - true - ); - removeComponentTemplateTaskKey = clusterService.registerClusterManagerTask( - ClusterManagerTaskKeys.REMOVE_COMPONENT_TEMPLATE_KEY, - true - ); + createIndexTemplateTaskKey = clusterService.registerClusterManagerTask(CREATE_INDEX_TEMPLATE, true); + createIndexTemplateV2TaskKey = clusterService.registerClusterManagerTask(CREATE_INDEX_TEMPLATE_V2, true); + removeIndexTemplateTaskKey = clusterService.registerClusterManagerTask(REMOVE_INDEX_TEMPLATE, true); + removeIndexTemplateV2TaskKey = clusterService.registerClusterManagerTask(REMOVE_INDEX_TEMPLATE_V2, true); + createComponentTemplateTaskKey = clusterService.registerClusterManagerTask(CREATE_COMPONENT_TEMPLATE, true); + removeComponentTemplateTaskKey = clusterService.registerClusterManagerTask(REMOVE_COMPONENT_TEMPLATE, true); } public void removeTemplates(final RemoveRequest request, final RemoveListener listener) { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java index 4f19c216bc436..d85ac58c68b2e 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java @@ -42,7 +42,6 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -68,6 +67,7 @@ import java.util.List; import java.util.Map; +import static org.opensearch.cluster.service.ClusterManagerTask.PUT_MAPPING; import static org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED; /** @@ -92,7 +92,7 @@ public MetadataMappingService(ClusterService clusterService, IndicesService indi this.indicesService = indicesService; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - putMappingTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_MAPPING_KEY, true); + putMappingTaskKey = clusterService.registerClusterManagerTask(PUT_MAPPING, true); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 6bc7f5a865d25..3e35ee90dad6c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -47,7 +47,6 @@ import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -84,6 +83,7 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogFlushIntervalSettingsForCompositeIndex; import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findComponentTemplate; import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.service.ClusterManagerTask.UPDATE_SETTINGS; import static org.opensearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX; import static org.opensearch.index.IndexSettings.same; @@ -126,7 +126,7 @@ public MetadataUpdateSettingsService( this.awarenessReplicaBalance = awarenessReplicaBalance; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - updateSettingsTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_SETTINGS_KEY, true); + updateSettingsTaskKey = clusterService.registerClusterManagerTask(UPDATE_SETTINGS, true); } public void updateSettings( diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java index ede0985c2b420..4019145e2f823 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java @@ -967,12 +967,12 @@ void onNoLongerClusterManager() { /** * Functionality for register task key to cluster manager node. * - * @param taskKey - task key of task + * @param task - cluster manager task * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not * @return throttling task key which needs to be passed while submitting task to cluster manager */ - public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { - return clusterManagerTaskThrottler.registerClusterManagerTask(taskKey, throttlingEnabled); + public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) { + return clusterManagerTaskThrottler.registerClusterManagerTask(task, throttlingEnabled); } /** diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTask.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTask.java new file mode 100644 index 0000000000000..a22f0a093cbe1 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTask.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.common.annotation.PublicApi; + +/** + * Task keys and their throttling thresholds for cluster manager operations. + */ +@PublicApi(since = "3.0.0") +public enum ClusterManagerTask { + + // Tasks with default threshold (50) + CREATE_INDEX("create-index", 50), + UPDATE_SETTINGS("update-settings", 50), + CLUSTER_UPDATE_SETTINGS("cluster-update-settings", 50), + DELETE_INDEX("delete-index", 50), + DELETE_DANGLING_INDEX("delete-dangling-index", 50), + CREATE_DATA_STREAM("create-data-stream", 50), + REMOVE_DATA_STREAM("remove-data-stream", 50), + CREATE_INDEX_TEMPLATE("create-index-template", 50), + REMOVE_INDEX_TEMPLATE("remove-index-template", 50), + CREATE_COMPONENT_TEMPLATE("create-component-template", 50), + REMOVE_COMPONENT_TEMPLATE("remove-component-template", 50), + CREATE_INDEX_TEMPLATE_V2("create-index-template-v2", 50), + REMOVE_INDEX_TEMPLATE_V2("remove-index-template-v2", 50), + PUT_PIPELINE("put-pipeline", 50), + DELETE_PIPELINE("delete-pipeline", 50), + PUT_SEARCH_PIPELINE("put-search-pipeline", 50), + DELETE_SEARCH_PIPELINE("delete-search-pipeline", 50), + CREATE_PERSISTENT_TASK("create-persistent-task", 50), + FINISH_PERSISTENT_TASK("finish-persistent-task", 50), + REMOVE_PERSISTENT_TASK("remove-persistent-task", 50), + UPDATE_TASK_STATE("update-task-state", 50), + CREATE_QUERY_GROUP("create-query-group", 50), + DELETE_QUERY_GROUP("delete-query-group", 50), + UPDATE_QUERY_GROUP("update-query-group", 50), + PUT_SCRIPT("put-script", 50), + DELETE_SCRIPT("delete-script", 50), + PUT_REPOSITORY("put-repository", 50), + DELETE_REPOSITORY("delete-repository", 50), + CREATE_SNAPSHOT("create-snapshot", 50), + DELETE_SNAPSHOT("delete-snapshot", 50), + RESTORE_SNAPSHOT("restore-snapshot", 50), + CLUSTER_REROUTE_API("cluster-reroute-api", 50), + + // Tasks with custom thresholds + AUTO_CREATE("auto-create", 200), + ROLLOVER_INDEX("rollover-index", 200), + INDEX_ALIASES("index-aliases", 200), + PUT_MAPPING("put-mapping", 10000), + UPDATE_SNAPSHOT_STATE("update-snapshot-state", 5000); + + private final String key; + private final int threshold; + + ClusterManagerTask(String key, int threshold) { + this.key = key; + this.threshold = threshold; + } + + public String getKey() { + return key; + } + + public int getThreshold() { + return threshold; + } + + public static ClusterManagerTask fromKey(String key) { + for (ClusterManagerTask task : values()) { + if (task.getKey().equals(key)) { + return task; + } + } + throw new IllegalArgumentException("No cluster manager task found for key: " + key); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java deleted file mode 100644 index c88bea56cb9bd..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.service; - -/** - * Class for maintaining all cluster manager task key at one place. - */ -public final class ClusterManagerTaskKeys { - - public static final String CREATE_INDEX_KEY = "create-index"; - public static final String UPDATE_SETTINGS_KEY = "update-settings"; - public static final String CLUSTER_UPDATE_SETTINGS_KEY = "cluster-update-settings"; - public static final String AUTO_CREATE_KEY = "auto-create"; - public static final String DELETE_INDEX_KEY = "delete-index"; - public static final String DELETE_DANGLING_INDEX_KEY = "delete-dangling-index"; - public static final String CREATE_DATA_STREAM_KEY = "create-data-stream"; - public static final String REMOVE_DATA_STREAM_KEY = "remove-data-stream"; - public static final String ROLLOVER_INDEX_KEY = "rollover-index"; - public static final String INDEX_ALIASES_KEY = "index-aliases"; - public static final String PUT_MAPPING_KEY = "put-mapping"; - public static final String CREATE_INDEX_TEMPLATE_KEY = "create-index-template"; - public static final String REMOVE_INDEX_TEMPLATE_KEY = "remove-index-template"; - public static final String CREATE_COMPONENT_TEMPLATE_KEY = "create-component-template"; - public static final String REMOVE_COMPONENT_TEMPLATE_KEY = "remove-component-template"; - public static final String CREATE_INDEX_TEMPLATE_V2_KEY = "create-index-template-v2"; - public static final String REMOVE_INDEX_TEMPLATE_V2_KEY = "remove-index-template-v2"; - public static final String PUT_PIPELINE_KEY = "put-pipeline"; - public static final String DELETE_PIPELINE_KEY = "delete-pipeline"; - - public static final String PUT_SEARCH_PIPELINE_KEY = "put-search-pipeline"; - public static final String DELETE_SEARCH_PIPELINE_KEY = "delete-search-pipeline"; - public static final String CREATE_PERSISTENT_TASK_KEY = "create-persistent-task"; - public static final String FINISH_PERSISTENT_TASK_KEY = "finish-persistent-task"; - public static final String REMOVE_PERSISTENT_TASK_KEY = "remove-persistent-task"; - public static final String UPDATE_TASK_STATE_KEY = "update-task-state"; - public static final String PUT_SCRIPT_KEY = "put-script"; - public static final String DELETE_SCRIPT_KEY = "delete-script"; - public static final String PUT_REPOSITORY_KEY = "put-repository"; - public static final String DELETE_REPOSITORY_KEY = "delete-repository"; - public static final String CREATE_SNAPSHOT_KEY = "create-snapshot"; - public static final String DELETE_SNAPSHOT_KEY = "delete-snapshot"; - public static final String UPDATE_SNAPSHOT_STATE_KEY = "update-snapshot-state"; - public static final String RESTORE_SNAPSHOT_KEY = "restore-snapshot"; - public static final String CLUSTER_REROUTE_API_KEY = "cluster-reroute-api"; - -} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index 39ce218dd801a..878a5374e0681 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,9 +32,12 @@ * This class does throttling on task submission to cluster manager node, it uses throttling key defined in various executors * as key for throttling. Throttling will be performed over task executor's class level, different task types have different executors class. *

- * Set specific setting to for setting the threshold of throttling of particular task type. + * Set specific setting for setting the threshold of throttling of a particular task type. * e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks, - * Set it to default value(-1) to disable the throttling for this task type. + *

+ * Set it to (-1) to disable the throttling for this task type. + *

+ * All tasks must have a default threshold configured in {@link ClusterManagerTask}. */ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class); @@ -122,13 +126,19 @@ public static TimeValue getMaxDelayForRetry() { * Added retry mechanism in TransportClusterManagerNodeAction, so it would be retried for customer generated tasks. *

* If tasks are not getting retried then we can register with false flag, so user won't be able to configure threshold limits for it. + *

+ * If throttling is enabled, default threshold based on task type will be used if not specified in settings. */ - protected ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { - ThrottlingKey throttlingKey = new ThrottlingKey(taskKey, throttlingEnabled); - if (THROTTLING_TASK_KEYS.containsKey(taskKey)) { - throw new IllegalArgumentException("There is already a Throttling key registered with same name: " + taskKey); + protected ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) { + ThrottlingKey throttlingKey = new ThrottlingKey(task.getKey(), throttlingEnabled); + if (THROTTLING_TASK_KEYS.containsKey(task.getKey())) { + throw new IllegalArgumentException("There is already a Throttling key registered with same name: " + task.getKey()); + } + THROTTLING_TASK_KEYS.put(task.getKey(), throttlingKey); + + if (throttlingEnabled && Objects.isNull(getThrottlingLimit(task.getKey()))) { + tasksThreshold.put(task.getKey(), (long) task.getThreshold()); } - THROTTLING_TASK_KEYS.put(taskKey, throttlingKey); return throttlingKey; } @@ -176,7 +186,7 @@ void validateSetting(final Settings settings) { if (!THROTTLING_TASK_KEYS.get(key).isThrottlingEnabled()) { throw new IllegalArgumentException("Throttling is not enabled for given task type: " + key); } - int threshold = groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE); + int threshold = groups.get(key).getAsInt("value", ClusterManagerTask.fromKey(key).getThreshold()); if (threshold < MIN_THRESHOLD_VALUE) { throw new IllegalArgumentException("Provide positive integer for limit or -1 for disabling throttling"); } @@ -192,7 +202,8 @@ void updateSetting(final Settings newSettings) { settingKeys.addAll(tasksThreshold.keySet()); for (String key : settingKeys) { Settings setting = groups.get(key); - updateLimit(key, setting == null ? MIN_THRESHOLD_VALUE : setting.getAsInt("value", MIN_THRESHOLD_VALUE)); + int defaultThreshold = ClusterManagerTask.fromKey(key).getThreshold(); + updateLimit(key, setting == null ? defaultThreshold : setting.getAsInt("value", defaultThreshold)); } } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index ef27dde622b83..05d478bbb9df1 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -300,12 +300,12 @@ public final String getNodeName() { /** * Functionality for register task key to cluster manager node. * - * @param taskKey - task key of task + * @param task - cluster manager task * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not * @return throttling task key which needs to be passed while submitting task to cluster manager */ - public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { - return clusterManagerService.registerClusterManagerTask(taskKey, throttlingEnabled); + public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) { + return clusterManagerService.registerClusterManagerTask(task, throttlingEnabled); } /** diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 9f724f0d07ae5..bc7afa1a7589e 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -55,7 +55,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataIndexTemplateService; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; @@ -98,6 +97,9 @@ import java.util.function.IntConsumer; import java.util.stream.Collectors; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_PIPELINE; +import static org.opensearch.cluster.service.ClusterManagerTask.PUT_PIPELINE; + /** * Holder class for several ingest related services. * @@ -170,8 +172,8 @@ public IngestService( this.threadPool = threadPool; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_PIPELINE_KEY, true); - deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_PIPELINE_KEY, true); + putPipelineTaskKey = clusterService.registerClusterManagerTask(PUT_PIPELINE, true); + deletePipelineTaskKey = clusterService.registerClusterManagerTask(DELETE_PIPELINE, true); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_NUMBER_OF_INGEST_PROCESSORS, this::setMaxIngestProcessorCount); setMaxIngestProcessorCount(clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS)); } diff --git a/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java index 4e38fb34dbf17..eb187224ebf07 100644 --- a/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java @@ -44,7 +44,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; @@ -61,6 +60,11 @@ import java.io.Closeable; import java.util.Objects; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_PERSISTENT_TASK; +import static org.opensearch.cluster.service.ClusterManagerTask.FINISH_PERSISTENT_TASK; +import static org.opensearch.cluster.service.ClusterManagerTask.REMOVE_PERSISTENT_TASK; +import static org.opensearch.cluster.service.ClusterManagerTask.UPDATE_TASK_STATE; + /** * Component that runs only on the cluster-manager node and is responsible for assigning running tasks to nodes * @@ -106,10 +110,10 @@ public PersistentTasksClusterService( .addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setRecheckInterval); // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createPersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_PERSISTENT_TASK_KEY, true); - finishPersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.FINISH_PERSISTENT_TASK_KEY, true); - removePersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_PERSISTENT_TASK_KEY, true); - updatePersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_TASK_STATE_KEY, true); + createPersistentTaskKey = clusterService.registerClusterManagerTask(CREATE_PERSISTENT_TASK, true); + finishPersistentTaskKey = clusterService.registerClusterManagerTask(FINISH_PERSISTENT_TASK, true); + removePersistentTaskKey = clusterService.registerClusterManagerTask(REMOVE_PERSISTENT_TASK, true); + updatePersistentTaskKey = clusterService.registerClusterManagerTask(UPDATE_TASK_STATE, true); } // visible for testing only diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 49065be0abb25..994c981b8bb0c 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -55,7 +55,6 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.PublicApi; @@ -87,6 +86,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_REPOSITORY; +import static org.opensearch.cluster.service.ClusterManagerTask.PUT_REPOSITORY; import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2; import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING; @@ -157,8 +158,8 @@ public RepositoriesService( threadPool::relativeTimeInMillis ); // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - putRepositoryTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_REPOSITORY_KEY, true); - deleteRepositoryTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_REPOSITORY_KEY, true); + putRepositoryTaskKey = clusterService.registerClusterManagerTask(PUT_REPOSITORY, true); + deleteRepositoryTaskKey = clusterService.registerClusterManagerTask(DELETE_REPOSITORY, true); } /** diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 5e4bdc00a3b0e..448937fa3bb46 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -26,7 +26,6 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.metrics.OperationMetrics; @@ -64,6 +63,9 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_SEARCH_PIPELINE; +import static org.opensearch.cluster.service.ClusterManagerTask.PUT_SEARCH_PIPELINE; + /** * The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines * against requests and responses. @@ -126,8 +128,8 @@ public SearchPipelineService( searchPipelinePlugins, p -> p.getSearchPhaseResultsProcessors(parameters) ); - putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true); - deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true); + putPipelineTaskKey = clusterService.registerClusterManagerTask(PUT_SEARCH_PIPELINE, true); + deletePipelineTaskKey = clusterService.registerClusterManagerTask(DELETE_SEARCH_PIPELINE, true); } private static Map> processorFactories( diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 89403b15f6aca..5ecd90dda1dea 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -69,7 +69,6 @@ import org.opensearch.cluster.routing.ShardsIterator; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -126,6 +125,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_UPGRADED; +import static org.opensearch.cluster.service.ClusterManagerTask.RESTORE_SNAPSHOT; import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY; import static org.opensearch.common.util.IndexUtils.filterIndices; import static org.opensearch.common.util.set.Sets.newHashSet; @@ -229,7 +229,7 @@ public RestoreService( this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier; // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.RESTORE_SNAPSHOT_KEY, true); + restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(RESTORE_SNAPSHOT, true); } /** diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 0972f5dad0fa2..c5470a3cea517 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -74,7 +74,6 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -137,6 +136,9 @@ import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableList; import static org.opensearch.cluster.SnapshotsInProgress.completed; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_SNAPSHOT; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_SNAPSHOT; +import static org.opensearch.cluster.service.ClusterManagerTask.UPDATE_SNAPSHOT_STATE; import static org.opensearch.common.util.IndexUtils.filterIndices; import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; @@ -268,9 +270,9 @@ public SnapshotsService( } // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_SNAPSHOT_KEY, true); - deleteSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SNAPSHOT_KEY, true); - updateSnapshotStateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_SNAPSHOT_STATE_KEY, true); + createSnapshotTaskKey = clusterService.registerClusterManagerTask(CREATE_SNAPSHOT, true); + deleteSnapshotTaskKey = clusterService.registerClusterManagerTask(DELETE_SNAPSHOT, true); + updateSnapshotStateTaskKey = clusterService.registerClusterManagerTask(UPDATE_SNAPSHOT_STATE, true); } /** diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java index d9e6b2d90cfca..90ae3bece80c8 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java @@ -95,6 +95,9 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_INDEX; +import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_INDEX; +import static org.opensearch.cluster.service.ClusterManagerTask.PUT_MAPPING; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -914,8 +917,8 @@ public void testThrottlingForTaskSubmission() throws InterruptedException { int taskId = 1; final CyclicBarrier barrier = new CyclicBarrier(2); final CountDownLatch latch = new CountDownLatch(1); - final String taskName = "test"; - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = clusterManagerService.registerClusterManagerTask(taskName, true); + final ClusterManagerTask task = CREATE_INDEX; + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = clusterManagerService.registerClusterManagerTask(task, true); class Task { private final int id; @@ -945,7 +948,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } } - clusterManagerService.clusterManagerTaskThrottler.updateLimit(taskName, throttlingLimit); + clusterManagerService.clusterManagerTaskThrottler.updateLimit(task.getKey(), throttlingLimit); final ClusterStateTaskListener listener = new ClusterStateTaskListener() { @Override @@ -959,7 +962,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS // submit one task which will be execution, post that will submit throttlingLimit tasks. try { clusterManagerService.submitStateUpdateTask( - taskName, + task.getKey(), new Task(taskId++), ClusterStateTaskConfig.build(randomFrom(Priority.values())), executor, @@ -974,7 +977,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS for (int i = 0; i < throttlingLimit; i++) { try { clusterManagerService.submitStateUpdateTask( - taskName, + task.getKey(), new Task(taskId++), ClusterStateTaskConfig.build(randomFrom(Priority.values())), executor, @@ -989,7 +992,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS final AtomicReference assertionRef = new AtomicReference<>(); try { clusterManagerService.submitStateUpdateTask( - taskName, + task.getKey(), new Task(taskId++), ClusterStateTaskConfig.build(randomFrom(Priority.values())), executor, @@ -1010,9 +1013,9 @@ public void testThrottlingForMultipleTaskTypes() throws InterruptedException { int numberOfTask1 = randomIntBetween(throttlingLimitForTask1, 10); int numberOfTask2 = randomIntBetween(throttlingLimitForTask2, 10); int numberOfTask3 = randomIntBetween(throttlingLimitForTask3, 10); - String task1 = "Task1"; - String task2 = "Task2"; - String task3 = "Task3"; + ClusterManagerTask task1 = CREATE_INDEX; + ClusterManagerTask task2 = PUT_MAPPING; + ClusterManagerTask task3 = DELETE_INDEX; ClusterManagerTaskThrottler.ThrottlingKey throttlingKey1 = clusterManagerService.registerClusterManagerTask(task1, true); ClusterManagerTaskThrottler.ThrottlingKey throttlingKey2 = clusterManagerService.registerClusterManagerTask(task2, true); @@ -1071,8 +1074,8 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey( } // configuring limits for Task1 and Task3. All task submission of Task2 should pass. - clusterManagerService.clusterManagerTaskThrottler.updateLimit(task1, throttlingLimitForTask1); - clusterManagerService.clusterManagerTaskThrottler.updateLimit(task3, throttlingLimitForTask3); + clusterManagerService.clusterManagerTaskThrottler.updateLimit(task1.getKey(), throttlingLimitForTask1); + clusterManagerService.clusterManagerTaskThrottler.updateLimit(task3.getKey(), throttlingLimitForTask3); final CountDownLatch latch = new CountDownLatch(numberOfTask1 + numberOfTask2 + numberOfTask3); AtomicInteger throttledTask1 = new AtomicInteger(); AtomicInteger throttledTask2 = new AtomicInteger(); @@ -1086,18 +1089,18 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey( @Override public void onFailure(String source, Exception e) { // Task3's timeout should have called this. - assertEquals(task3, source); + assertEquals(task3.getKey(), source); timedOutTask3.incrementAndGet(); latch.countDown(); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (source.equals(task1)) { + if (source.equals(task1.getKey())) { succeededTask1.incrementAndGet(); - } else if (source.equals(task2)) { + } else if (source.equals(task2.getKey())) { succeededTask2.incrementAndGet(); - } else if (source.equals(task3)) { + } else if (source.equals(task3.getKey())) { succeededTask3.incrementAndGet(); } latch.countDown(); @@ -1113,7 +1116,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public void run() { try { clusterManagerService.submitStateUpdateTask( - task1, + task1.getKey(), new Task1(), ClusterStateTaskConfig.build(randomFrom(Priority.values())), executor1, @@ -1133,7 +1136,7 @@ public void run() { public void run() { try { clusterManagerService.submitStateUpdateTask( - task2, + task2.getKey(), new Task2(), ClusterStateTaskConfig.build(randomFrom(Priority.values())), executor2, @@ -1152,7 +1155,7 @@ public void run() { public void run() { try { clusterManagerService.submitStateUpdateTask( - task3, + task3.getKey(), new Task3(), ClusterStateTaskConfig.build(randomFrom(Priority.values()), new TimeValue(0)), executor3, diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index 9ad42554a8404..0c7c95828308f 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -34,6 +34,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.service.ClusterManagerTask.CREATE_INDEX; +import static org.opensearch.cluster.service.ClusterManagerTask.PUT_MAPPING; +import static org.opensearch.cluster.service.ClusterManagerTaskThrottler.THRESHOLD_SETTINGS; import static org.opensearch.test.ClusterServiceUtils.setState; /** @@ -73,13 +76,26 @@ public void testDefaults() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - throttler.registerClusterManagerTask("put-mapping", true); - throttler.registerClusterManagerTask("create-index", true); + throttler.registerClusterManagerTask(PUT_MAPPING, true); + throttler.registerClusterManagerTask(CREATE_INDEX, true); + for (String key : throttler.THROTTLING_TASK_KEYS.keySet()) { - assertNull(throttler.getThrottlingLimit(key)); + assertEquals(ClusterManagerTask.fromKey(key).getThreshold(), throttler.getThrottlingLimit(key).intValue()); } } + public void testThrottlingThresholdNotConfigured() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { + return clusterService.getClusterManagerService().getMinNodeVersion(); + }, new ClusterManagerThrottlingStats()); + final IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> throttler.registerClusterManagerTask(ClusterManagerTask.fromKey("random-task"), true) + ); + assertEquals("No cluster manager task found for key: random-task", exception.getMessage()); + } + public void testValidateSettingsForDifferentVersion() { DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0); DiscoveryNode dataNode = getDataNode(Version.V_2_0_0); @@ -92,13 +108,13 @@ public void testValidateSettingsForDifferentVersion() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - throttler.registerClusterManagerTask("put-mapping", true); + throttler.registerClusterManagerTask(PUT_MAPPING, true); - // set some limit for update snapshot tasks + // set some limit for put-mapping tasks int newLimit = randomIntBetween(1, 10); Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); - assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); + assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(THRESHOLD_SETTINGS.get(newSettings))); // validate for empty setting, it shouldn't throw exception Settings emptySettings = Settings.builder().build(); @@ -122,13 +138,13 @@ public void testValidateSettingsForTaskWithoutRetryOnDataNode() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - throttler.registerClusterManagerTask("put-mapping", false); + throttler.registerClusterManagerTask(PUT_MAPPING, false); - // set some limit for update snapshot tasks + // set some limit for put-mapping tasks int newLimit = randomIntBetween(1, 10); Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); - assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); + assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(THRESHOLD_SETTINGS.get(newSettings))); } public void testUpdateSettingsForNullValue() { @@ -143,18 +159,20 @@ public void testUpdateSettingsForNullValue() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - throttler.registerClusterManagerTask("put-mapping", true); + throttler.registerClusterManagerTask(PUT_MAPPING, true); + // check default value + assertEquals(ClusterManagerTask.PUT_MAPPING.getThreshold(), throttler.getThrottlingLimit(PUT_MAPPING.getKey()).intValue()); // set some limit for put-mapping tasks int newLimit = randomIntBetween(1, 10); Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); clusterSettings.applySettings(newSettings); - assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue()); + assertEquals(newLimit, throttler.getThrottlingLimit(PUT_MAPPING.getKey()).intValue()); // set limit to null Settings nullSettings = Settings.builder().build(); clusterSettings.applySettings(nullSettings); - assertNull(throttler.getThrottlingLimit("put-mapping")); + assertEquals(ClusterManagerTask.PUT_MAPPING.getThreshold(), throttler.getThrottlingLimit(PUT_MAPPING.getKey()).intValue()); } public void testSettingsOnBootstrap() { @@ -177,10 +195,10 @@ public void testSettingsOnBootstrap() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - throttler.registerClusterManagerTask("put-mapping", true); + throttler.registerClusterManagerTask(PUT_MAPPING, true); // assert that limit is applied on throttler - assertEquals(put_mapping_threshold_value, throttler.getThrottlingLimit("put-mapping").intValue()); + assertEquals(put_mapping_threshold_value, throttler.getThrottlingLimit(PUT_MAPPING.getKey()).intValue()); // assert that delay setting is applied on throttler assertEquals(baseDelay, ClusterManagerTaskThrottler.getBaseDelayForRetry().seconds()); assertEquals(maxDelay, ClusterManagerTaskThrottler.getMaxDelayForRetry().seconds()); @@ -222,10 +240,10 @@ public void testValidateSettingsForUnknownTask() { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - // set some limit for update snapshot tasks + // set some limit for random tasks int newLimit = randomIntBetween(1, 10); Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.random-task.value", newLimit).build(); - assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); + assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(THRESHOLD_SETTINGS.get(newSettings))); } public void testUpdateThrottlingLimitForBasicSanity() { @@ -240,19 +258,19 @@ public void testUpdateThrottlingLimitForBasicSanity() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - throttler.registerClusterManagerTask("put-mapping", true); + throttler.registerClusterManagerTask(PUT_MAPPING, true); - // set some limit for update snapshot tasks + // set some limit for put-mapping tasks long newLimit = randomLongBetween(1, 10); Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); clusterSettings.applySettings(newSettings); - assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue()); + assertEquals(newLimit, throttler.getThrottlingLimit(PUT_MAPPING.getKey()).intValue()); - // set update snapshot task limit to default - newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", -1).build(); + // set put-mapping task limit to 20 + newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", 20).build(); clusterSettings.applySettings(newSettings); - assertNull(throttler.getThrottlingLimit("put-mapping")); + assertEquals(20, throttler.getThrottlingLimit(PUT_MAPPING.getKey()).intValue()); } public void testValidateSettingForLimit() { @@ -267,10 +285,10 @@ public void testValidateSettingForLimit() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - throttler.registerClusterManagerTask("put-mapping", true); + throttler.registerClusterManagerTask(PUT_MAPPING, true); - Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.values", -5).build(); - assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); + Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", -5).build(); + assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(THRESHOLD_SETTINGS.get(newSettings))); } public void testUpdateLimit() { @@ -278,12 +296,12 @@ public void testUpdateLimit() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); - throttler.registerClusterManagerTask("put-mapping", true); + throttler.registerClusterManagerTask(PUT_MAPPING, true); - throttler.updateLimit("test", 5); - assertEquals(5L, throttler.getThrottlingLimit("test").intValue()); - throttler.updateLimit("test", -1); - assertNull(throttler.getThrottlingLimit("test")); + throttler.updateLimit(PUT_MAPPING.getKey(), 5); + assertEquals(5L, throttler.getThrottlingLimit(PUT_MAPPING.getKey()).intValue()); + throttler.updateLimit(PUT_MAPPING.getKey(), -1); + assertNull(throttler.getThrottlingLimit(PUT_MAPPING.getKey())); } private DiscoveryNode getDataNode(Version version) { @@ -308,24 +326,24 @@ private DiscoveryNode getClusterManagerNode(Version version) { public void testThrottlingForDisabledThrottlingTask() { ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); - String taskKey = "test"; + ClusterManagerTask task = CREATE_INDEX; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, false); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(task, false); // adding limit directly in thresholds - throttler.updateLimit(taskKey, 5); + throttler.updateLimit(task.getKey(), 5); // adding 10 tasks, should pass as throttling is disabled for task - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 10)); + throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 10)); // Asserting that there was not any throttling for it - assertEquals(0L, throttlingStats.getThrottlingCount(taskKey)); + assertEquals(0L, throttlingStats.getThrottlingCount(task.getKey())); // Asserting value in tasksCount map to make sure it gets updated even when throttling is disabled - assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(task.getKey())); } public void testThrottlingForInitialStaticSettingAndVersionCheck() { @@ -346,11 +364,11 @@ public void testThrottlingForInitialStaticSettingAndVersionCheck() { ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(PUT_MAPPING, true); - // verifying adding more tasks then threshold passes - throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, put_mapping_threshold_value + 5)); - assertEquals(0L, throttlingStats.getThrottlingCount("put-mapping")); + // verifying adding more tasks than threshold passes + throttler.onBeginSubmit(getMockUpdateTaskList(PUT_MAPPING.getKey(), throttlingKey, put_mapping_threshold_value + 5)); + assertEquals(0L, throttlingStats.getThrottlingCount(PUT_MAPPING.getKey())); // Removing older version node from cluster setState( @@ -362,90 +380,90 @@ public void testThrottlingForInitialStaticSettingAndVersionCheck() { // As queue already have more tasks than threshold from previous call. assertThrows( ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, 3)) + () -> throttler.onBeginSubmit(getMockUpdateTaskList(PUT_MAPPING.getKey(), throttlingKey, 3)) ); - assertEquals(3L, throttlingStats.getThrottlingCount("put-mapping")); + assertEquals(3L, throttlingStats.getThrottlingCount(PUT_MAPPING.getKey())); } public void testThrottling() { ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); - String taskKey = "test"; + ClusterManagerTask task = CREATE_INDEX; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(task, true); - throttler.updateLimit(taskKey, 5); + throttler.updateLimit(task.getKey(), 5); // adding 3 tasks - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)); // adding 3 more tasks, these tasks should be throttled // taskCount in Queue: 3 Threshold: 5 assertThrows( ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + () -> throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)) ); - assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + assertEquals(3L, throttlingStats.getThrottlingCount(task.getKey())); // remove one task - throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + throttler.onBeginProcessing(getMockUpdateTaskList(task.getKey(), throttlingKey, 1)); // add 3 tasks should pass now. // taskCount in Queue: 2 Threshold: 5 - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)); // adding one task will throttle // taskCount in Queue: 5 Threshold: 5 assertThrows( ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + () -> throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 1)) ); - assertEquals(4L, throttlingStats.getThrottlingCount(taskKey)); + assertEquals(4L, throttlingStats.getThrottlingCount(task.getKey())); // update limit of threshold 6 - throttler.updateLimit(taskKey, 6); + throttler.updateLimit(task.getKey(), 6); // adding one task should pass now - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 1)); } public void testThrottlingWithLock() { ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); - String taskKey = "test"; + ClusterManagerTask task = CREATE_INDEX; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(task, true); - throttler.updateLimit(taskKey, 5); + throttler.updateLimit(task.getKey(), 5); // adding 3 tasks - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)); // adding 3 more tasks, these tasks should be throttled // taskCount in Queue: 3 Threshold: 5 assertThrows( ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + () -> throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)) ); - assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + assertEquals(3L, throttlingStats.getThrottlingCount(task.getKey())); // remove one task - throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + throttler.onBeginProcessing(getMockUpdateTaskList(task.getKey(), throttlingKey, 1)); // add 3 tasks should pass now. // taskCount in Queue: 2 Threshold: 5 - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)); final CountDownLatch latch = new CountDownLatch(1); Thread threadToLock = null; try { // Taking lock on tasksCount will not impact throttling behaviour now. threadToLock = new Thread(() -> { - throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + throttler.tasksCount.computeIfPresent(task.getKey(), (key, count) -> { try { latch.await(); } catch (InterruptedException e) { @@ -460,11 +478,11 @@ public void testThrottlingWithLock() { // taskCount in Queue: 5 Threshold: 5 final ClusterManagerThrottlingException exception = assertThrows( ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + () -> throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 1)) ); - assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); - assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); - assertEquals(4L, throttlingStats.getThrottlingCount(taskKey)); + assertEquals("Throttling Exception : Limit exceeded for create-index", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(task.getKey())); + assertEquals(4L, throttlingStats.getThrottlingCount(task.getKey())); } finally { if (threadToLock != null) { latch.countDown(); @@ -476,37 +494,37 @@ public void testThrottlingWithLock() { } } } - assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(task.getKey())); } public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); - String taskKey = "test"; + ClusterManagerTask task = CREATE_INDEX; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(task, true); - throttler.updateLimit(taskKey, 5); + throttler.updateLimit(task.getKey(), 5); // adding 3 tasks - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)); // adding 3 more tasks, these tasks should be throttled // taskCount in Queue: 3 Threshold: 5 assertThrows( ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + () -> throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)) ); - assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + assertEquals(3L, throttlingStats.getThrottlingCount(task.getKey())); // remove one task - throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + throttler.onBeginProcessing(getMockUpdateTaskList(task.getKey(), throttlingKey, 1)); // add 3 tasks should pass now. // taskCount in Queue: 2 Threshold: 5 - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 3)); final CountDownLatch latch = new CountDownLatch(1); Thread threadToLock = null; @@ -515,7 +533,7 @@ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { try { // Taking lock on tasksCount will not impact throttling behaviour now. threadToLock = new Thread(() -> { - throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + throttler.tasksCount.computeIfPresent(task.getKey(), (key, count) -> { try { latch.await(); } catch (InterruptedException e) { @@ -533,10 +551,10 @@ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { // taskCount in Queue: 5 Threshold: 5 final ClusterManagerThrottlingException exception = assertThrows( ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + () -> throttler.onBeginSubmit(getMockUpdateTaskList(task.getKey(), throttlingKey, 1)) ); - assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); - assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); + assertEquals("Throttling Exception : Limit exceeded for create-index", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(task.getKey())); latch2.countDown(); }); submittingThread.start(); @@ -547,7 +565,7 @@ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { } catch (InterruptedException e) { throw new RuntimeException(e); } - assertEquals(13L, throttlingStats.getThrottlingCount(taskKey)); + assertEquals(13L, throttlingStats.getThrottlingCount(task.getKey())); } finally { if (threadToLock != null) { latch.countDown(); @@ -566,7 +584,7 @@ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { } } } - assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(task.getKey())); } private List getMockUpdateTaskList(