Skip to content

Commit 58e444a

Browse files
committed
Merge branch 'main' into add_skip_factor
2 parents 36c9282 + a9c9586 commit 58e444a

File tree

11 files changed

+494
-4
lines changed

11 files changed

+494
-4
lines changed

.github/benchmark-configs.json

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
"SINGLE_NODE_CLUSTER": "true",
5959
"MIN_DISTRIBUTION": "true",
6060
"TEST_WORKLOAD": "big5",
61-
"WORKLOAD_PARAMS": "{\"snapshot_repo_name\":\"benchmark-workloads-repo-3x\",\"snapshot_bucket_name\":\"benchmark-workload-snapshots\",\"snapshot_region\":\"us-east-1\",\"snapshot_base_path\":\"workload-snapshots-3x\",\"snapshot_name\":\"big5_1_shard_single_client\"}",
61+
"WORKLOAD_PARAMS": "{\"snapshot_repo_name\":\"benchmark-workloads-repo-3x\",\"snapshot_bucket_name\":\"benchmark-workload-snapshots\",\"snapshot_region\":\"us-east-1\",\"snapshot_base_path\":\"10.2.1\",\"snapshot_name\":\"big5_1_shard_single_client\"}",
6262
"CAPTURE_NODE_STAT": "true",
6363
"TEST_PROCEDURE": "restore-from-snapshot"
6464
},
@@ -204,5 +204,22 @@
204204
"data_instance_config": "4vCPU, 32G Mem, 16G Heap"
205205
},
206206
"baseline_cluster_config": "x64-r5.xlarge-1-shard-0-replica-snapshot-baseline"
207+
},
208+
"id_13": {
209+
"description": "Search only test-procedure for HTTP_LOGS, uses snapshot to restore the data for OS-3.0.0",
210+
"supported_major_versions": ["3"],
211+
"cluster-benchmark-configs": {
212+
"SINGLE_NODE_CLUSTER": "true",
213+
"MIN_DISTRIBUTION": "true",
214+
"TEST_WORKLOAD": "http_logs",
215+
"WORKLOAD_PARAMS": "{\"snapshot_repo_name\":\"benchmark-workloads-repo-3x\",\"snapshot_bucket_name\":\"benchmark-workload-snapshots\",\"snapshot_region\":\"us-east-1\",\"snapshot_base_path\":\"10.2.1\",\"snapshot_name\":\"http_logs_1_shard\"}",
216+
"CAPTURE_NODE_STAT": "true",
217+
"TEST_PROCEDURE": "restore-from-snapshot"
218+
},
219+
"cluster_configuration": {
220+
"size": "Single-Node",
221+
"data_instance_config": "4vCPU, 32G Mem, 16G Heap"
222+
},
223+
"baseline_cluster_config": "x64-r5.xlarge-1-shard-0-replica-snapshot-baseline"
207224
}
208225
}

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- [Rule based auto-tagging] Add Delete Rule API ([#18184](https://github.com/opensearch-project/OpenSearch/pull/18184))
1111
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
1212
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
13+
- [Rule based Auto-tagging] Add wlm `ActionFilter` ([#17791](https://github.com/opensearch-project/OpenSearch/pull/17791))
1314
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
1415
- Add Warm Disk Threshold Allocation Decider for Warm shards ([#18082](https://github.com/opensearch-project/OpenSearch/pull/18082))
1516
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
@@ -18,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1819
- [Security Manager Replacement] Enhance Java Agent to intercept newByteChannel ([#17989](https://github.com/opensearch-project/OpenSearch/pull/17989))
1920
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
2021
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
22+
- Reject close index requests, while remote store migration is in progress.([#18327](https://github.com/opensearch-project/OpenSearch/pull/18327))
2123
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
2224
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
2325
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))

modules/autotagging-commons/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99

1010
opensearchplugin {
11+
name = "rule-framework"
1112
description = 'OpenSearch Rule Framework plugin'
1213
classname = 'org.opensearch.rule.RuleFrameworkPlugin'
1314
}

modules/autotagging-commons/spi/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ base {
1616
}
1717

1818
dependencies {
19-
api project(':modules:autotagging-commons:common')
19+
implementation project(':modules:autotagging-commons:common')
2020
}
2121

2222
disableTasks("forbiddenApisMain")

plugins/workload-management/build.gradle

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,17 @@ apply plugin: 'opensearch.internal-cluster-test'
1717
opensearchplugin {
1818
description = 'OpenSearch Workload Management Plugin.'
1919
classname = 'org.opensearch.plugin.wlm.WorkloadManagementPlugin'
20-
extendedPlugins = [] // Remove autotagging-commons since it's not a plugin
20+
extendedPlugins = ['rule-framework']
2121
}
2222

2323
dependencies {
2424
implementation project(':modules:autotagging-commons:common')
25-
implementation project(':modules:autotagging-commons:spi')
25+
compileOnly project(':modules:autotagging-commons:spi')
26+
compileOnly project(':modules:autotagging-commons')
27+
testImplementation project(':modules:autotagging-commons')
28+
testImplementation project(':modules:autotagging-commons:common')
29+
}
30+
31+
testClusters.all {
32+
testDistribution = 'archive'
2633
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.wlm;
10+
11+
import org.opensearch.action.ActionRequest;
12+
import org.opensearch.action.IndicesRequest;
13+
import org.opensearch.action.search.SearchRequest;
14+
import org.opensearch.action.support.ActionFilter;
15+
import org.opensearch.action.support.ActionFilterChain;
16+
import org.opensearch.core.action.ActionListener;
17+
import org.opensearch.core.action.ActionResponse;
18+
import org.opensearch.plugin.wlm.rule.attribute_extractor.IndicesExtractor;
19+
import org.opensearch.rule.InMemoryRuleProcessingService;
20+
import org.opensearch.tasks.Task;
21+
import org.opensearch.threadpool.ThreadPool;
22+
import org.opensearch.wlm.WorkloadGroupTask;
23+
24+
import java.util.List;
25+
import java.util.Optional;
26+
27+
/**
28+
* This class is responsible to evaluate and assign the WORKLOAD_GROUP_ID header in ThreadContext
29+
*/
30+
public class AutoTaggingActionFilter implements ActionFilter {
31+
private final InMemoryRuleProcessingService ruleProcessingService;
32+
ThreadPool threadPool;
33+
34+
/**
35+
* Main constructor
36+
* @param ruleProcessingService provides access to in memory view of rules
37+
* @param threadPool to access assign the label
38+
*/
39+
public AutoTaggingActionFilter(InMemoryRuleProcessingService ruleProcessingService, ThreadPool threadPool) {
40+
this.ruleProcessingService = ruleProcessingService;
41+
this.threadPool = threadPool;
42+
}
43+
44+
@Override
45+
public int order() {
46+
return Integer.MAX_VALUE;
47+
}
48+
49+
@Override
50+
public <Request extends ActionRequest, Response extends ActionResponse> void apply(
51+
Task task,
52+
String action,
53+
Request request,
54+
ActionListener<Response> listener,
55+
ActionFilterChain<Request, Response> chain
56+
) {
57+
final boolean isValidRequest = request instanceof SearchRequest;
58+
59+
if (!isValidRequest) {
60+
chain.proceed(task, action, request, listener);
61+
return;
62+
}
63+
Optional<String> label = ruleProcessingService.evaluateLabel(List.of(new IndicesExtractor((IndicesRequest) request)));
64+
65+
label.ifPresent(s -> threadPool.getThreadContext().putHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, s));
66+
chain.proceed(task, action, request, listener);
67+
}
68+
}

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.plugin.wlm;
1010

1111
import org.opensearch.action.ActionRequest;
12+
import org.opensearch.action.support.ActionFilter;
1213
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
1314
import org.opensearch.cluster.node.DiscoveryNodes;
1415
import org.opensearch.cluster.service.ClusterService;
@@ -44,10 +45,12 @@
4445
import org.opensearch.repositories.RepositoriesService;
4546
import org.opensearch.rest.RestController;
4647
import org.opensearch.rest.RestHandler;
48+
import org.opensearch.rule.InMemoryRuleProcessingService;
4749
import org.opensearch.rule.RulePersistenceService;
4850
import org.opensearch.rule.autotagging.FeatureType;
4951
import org.opensearch.rule.service.IndexStoredRulePersistenceService;
5052
import org.opensearch.rule.spi.RuleFrameworkExtension;
53+
import org.opensearch.rule.storage.DefaultAttributeValueStore;
5154
import org.opensearch.rule.storage.IndexBasedRuleQueryMapper;
5255
import org.opensearch.rule.storage.XContentRuleParser;
5356
import org.opensearch.script.ScriptService;
@@ -75,6 +78,8 @@ public class WorkloadManagementPlugin extends Plugin implements ActionPlugin, Sy
7578

7679
private final RulePersistenceServiceHolder rulePersistenceServiceHolder = new RulePersistenceServiceHolder();
7780

81+
private AutoTaggingActionFilter autoTaggingActionFilter;
82+
7883
/**
7984
* Default constructor
8085
*/
@@ -101,9 +106,19 @@ public Collection<Object> createComponents(
101106
new XContentRuleParser(WorkloadGroupFeatureType.INSTANCE),
102107
new IndexBasedRuleQueryMapper()
103108
);
109+
InMemoryRuleProcessingService ruleProcessingService = new InMemoryRuleProcessingService(
110+
WorkloadGroupFeatureType.INSTANCE,
111+
DefaultAttributeValueStore::new
112+
);
113+
autoTaggingActionFilter = new AutoTaggingActionFilter(ruleProcessingService, threadPool);
104114
return Collections.emptyList();
105115
}
106116

117+
@Override
118+
public List<ActionFilter> getActionFilters() {
119+
return List.of(autoTaggingActionFilter);
120+
}
121+
107122
@Override
108123
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
109124
return List.of(
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.wlm;
10+
11+
import org.opensearch.action.ActionRequest;
12+
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
13+
import org.opensearch.action.search.SearchRequest;
14+
import org.opensearch.action.support.ActionFilterChain;
15+
import org.opensearch.common.util.concurrent.ThreadContext;
16+
import org.opensearch.core.action.ActionListener;
17+
import org.opensearch.core.action.ActionResponse;
18+
import org.opensearch.core.common.io.stream.StreamOutput;
19+
import org.opensearch.rule.InMemoryRuleProcessingService;
20+
import org.opensearch.rule.autotagging.Attribute;
21+
import org.opensearch.rule.autotagging.FeatureType;
22+
import org.opensearch.rule.storage.DefaultAttributeValueStore;
23+
import org.opensearch.tasks.Task;
24+
import org.opensearch.test.OpenSearchTestCase;
25+
import org.opensearch.threadpool.TestThreadPool;
26+
import org.opensearch.threadpool.ThreadPool;
27+
import org.opensearch.wlm.WorkloadGroupTask;
28+
29+
import java.io.IOException;
30+
import java.util.Map;
31+
import java.util.Optional;
32+
33+
import static org.mockito.Mockito.anyList;
34+
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.spy;
36+
import static org.mockito.Mockito.times;
37+
import static org.mockito.Mockito.verify;
38+
import static org.mockito.Mockito.when;
39+
40+
public class AutoTaggingActionFilterTests extends OpenSearchTestCase {
41+
42+
AutoTaggingActionFilter autoTaggingActionFilter;
43+
InMemoryRuleProcessingService ruleProcessingService;
44+
ThreadPool threadPool;
45+
46+
public void setUp() throws Exception {
47+
super.setUp();
48+
threadPool = new TestThreadPool("AutoTaggingActionFilterTests");
49+
ruleProcessingService = spy(new InMemoryRuleProcessingService(WLMFeatureType.WLM, DefaultAttributeValueStore::new));
50+
autoTaggingActionFilter = new AutoTaggingActionFilter(ruleProcessingService, threadPool);
51+
}
52+
53+
public void tearDown() throws Exception {
54+
super.tearDown();
55+
threadPool.shutdownNow();
56+
}
57+
58+
public void testOrder() {
59+
assertEquals(Integer.MAX_VALUE, autoTaggingActionFilter.order());
60+
}
61+
62+
public void testApplyForValidRequest() {
63+
SearchRequest request = mock(SearchRequest.class);
64+
ActionFilterChain<ActionRequest, ActionResponse> mockFilterChain = mock(TestActionFilterChain.class);
65+
when(request.indices()).thenReturn(new String[] { "foo" });
66+
try (ThreadContext.StoredContext context = threadPool.getThreadContext().stashContext()) {
67+
when(ruleProcessingService.evaluateLabel(anyList())).thenReturn(Optional.of("TestQG_ID"));
68+
autoTaggingActionFilter.apply(mock(Task.class), "Test", request, null, mockFilterChain);
69+
70+
assertEquals("TestQG_ID", threadPool.getThreadContext().getHeader(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER));
71+
verify(ruleProcessingService, times(1)).evaluateLabel(anyList());
72+
}
73+
}
74+
75+
public void testApplyForInValidRequest() {
76+
ActionFilterChain<ActionRequest, ActionResponse> mockFilterChain = mock(TestActionFilterChain.class);
77+
CancelTasksRequest request = new CancelTasksRequest();
78+
autoTaggingActionFilter.apply(mock(Task.class), "Test", request, null, mockFilterChain);
79+
80+
verify(ruleProcessingService, times(0)).evaluateLabel(anyList());
81+
}
82+
83+
public enum WLMFeatureType implements FeatureType {
84+
WLM;
85+
86+
@Override
87+
public String getName() {
88+
return "";
89+
}
90+
91+
@Override
92+
public Map<String, Attribute> getAllowedAttributesRegistry() {
93+
return Map.of("test_attribute", TestAttribute.TEST_ATTRIBUTE);
94+
}
95+
96+
@Override
97+
public void registerFeatureType() {}
98+
}
99+
100+
public enum TestAttribute implements Attribute {
101+
TEST_ATTRIBUTE("test_attribute"),
102+
INVALID_ATTRIBUTE("invalid_attribute");
103+
104+
private final String name;
105+
106+
TestAttribute(String name) {
107+
this.name = name;
108+
}
109+
110+
@Override
111+
public String getName() {
112+
return name;
113+
}
114+
115+
@Override
116+
public void validateAttribute() {}
117+
118+
@Override
119+
public void writeTo(StreamOutput out) throws IOException {}
120+
}
121+
122+
private static class TestActionFilterChain implements ActionFilterChain<ActionRequest, ActionResponse> {
123+
@Override
124+
public void proceed(Task task, String action, ActionRequest request, ActionListener<ActionResponse> listener) {
125+
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)