8
8
9
9
package org .opensearch .remotemigration ;
10
10
11
+ import org .opensearch .action .admin .cluster .allocation .ClusterAllocationExplanation ;
11
12
import org .opensearch .action .admin .cluster .settings .ClusterUpdateSettingsRequest ;
13
+ import org .opensearch .action .support .ActiveShardCount ;
12
14
import org .opensearch .cluster .node .DiscoveryNode ;
13
15
import org .opensearch .cluster .node .DiscoveryNodes ;
14
16
import org .opensearch .cluster .routing .IndexShardRoutingTable ;
15
17
import org .opensearch .cluster .routing .ShardRouting ;
18
+ import org .opensearch .cluster .routing .ShardRoutingState ;
19
+ import org .opensearch .cluster .routing .allocation .AllocateUnassignedDecision ;
20
+ import org .opensearch .cluster .routing .allocation .MoveDecision ;
21
+ import org .opensearch .cluster .routing .allocation .NodeAllocationResult ;
22
+ import org .opensearch .cluster .routing .allocation .decider .Decision ;
23
+ import org .opensearch .common .Nullable ;
16
24
import org .opensearch .common .settings .Settings ;
25
+ import org .opensearch .index .IndexSettings ;
26
+ import org .opensearch .indices .replication .common .ReplicationType ;
17
27
28
+ import java .util .List ;
18
29
import java .util .Map ;
19
30
import java .util .Optional ;
20
31
32
+ import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_REMOTE_SEGMENT_STORE_REPOSITORY ;
33
+ import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_REMOTE_STORE_ENABLED ;
34
+ import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY ;
35
+ import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_REPLICATION_TYPE ;
36
+ import static org .opensearch .index .IndexSettings .INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING ;
21
37
import static org .opensearch .node .remotestore .RemoteStoreNodeService .MIGRATION_DIRECTION_SETTING ;
22
38
import static org .opensearch .node .remotestore .RemoteStoreNodeService .REMOTE_STORE_COMPATIBILITY_MODE_SETTING ;
23
39
import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
@@ -35,7 +51,7 @@ protected void setClusterMode(String mode) {
35
51
}
36
52
37
53
// set the migration direction for cluster [remote_store, docrep, none]
38
- public void setDirection (String direction ) {
54
+ protected void setDirection (String direction ) {
39
55
updateSettingsRequest .persistentSettings (Settings .builder ().put (MIGRATION_DIRECTION_SETTING .getKey (), direction ));
40
56
assertAcked (internalCluster ().client ().admin ().cluster ().updateSettings (updateSettingsRequest ).actionGet ());
41
57
}
@@ -67,7 +83,7 @@ protected String allNodesExcept(String except) {
67
83
return exclude .toString ();
68
84
}
69
85
70
- // create a new test index
86
+ // create a new test index with un-allocated primary and no replicas
71
87
protected void prepareIndexWithoutReplica (Optional <String > name ) {
72
88
String indexName = name .orElse (TEST_INDEX );
73
89
internalCluster ().client ()
@@ -84,6 +100,33 @@ protected void prepareIndexWithoutReplica(Optional<String> name) {
84
100
.actionGet ();
85
101
}
86
102
103
+ // create a new test index with allocated primary and 1 unallocated replica
104
+ public void prepareIndexWithAllocatedPrimary (DiscoveryNode primaryShardNode , Optional <String > name ) {
105
+ String indexName = name .orElse (TEST_INDEX );
106
+ internalCluster ().client ()
107
+ .admin ()
108
+ .indices ()
109
+ .prepareCreate (indexName )
110
+ .setSettings (
111
+ Settings .builder ()
112
+ .put ("index.number_of_shards" , 1 )
113
+ .put ("index.number_of_replicas" , 1 )
114
+ .put ("index.routing.allocation.include._name" , primaryShardNode .getName ())
115
+ .put ("index.routing.allocation.exclude._name" , allNodesExcept (primaryShardNode .getName ()))
116
+ )
117
+ .setWaitForActiveShards (ActiveShardCount .ONE )
118
+ .execute ()
119
+ .actionGet ();
120
+
121
+ ensureYellowAndNoInitializingShards (TEST_INDEX );
122
+
123
+ logger .info (" --> verify allocation of primary shard" );
124
+ assertAllocation (true , primaryShardNode );
125
+
126
+ logger .info (" --> verify non-allocation of replica shard" );
127
+ assertNonAllocation (false );
128
+ }
129
+
87
130
protected ShardRouting getShardRouting (boolean isPrimary ) {
88
131
IndexShardRoutingTable table = internalCluster ().client ()
89
132
.admin ()
@@ -98,4 +141,142 @@ protected ShardRouting getShardRouting(boolean isPrimary) {
98
141
return (isPrimary ? table .primaryShard () : table .replicaShards ().get (0 ));
99
142
}
100
143
144
+ // obtain decision for allocation/relocation of a shard to a given node
145
+ protected Decision getDecisionForTargetNode (
146
+ DiscoveryNode targetNode ,
147
+ boolean isPrimary ,
148
+ boolean includeYesDecisions ,
149
+ boolean isRelocation
150
+ ) {
151
+ ClusterAllocationExplanation explanation = internalCluster ().client ()
152
+ .admin ()
153
+ .cluster ()
154
+ .prepareAllocationExplain ()
155
+ .setIndex (TEST_INDEX )
156
+ .setShard (0 )
157
+ .setPrimary (isPrimary )
158
+ .setIncludeYesDecisions (includeYesDecisions )
159
+ .get ()
160
+ .getExplanation ();
161
+
162
+ Decision requiredDecision = null ;
163
+ List <NodeAllocationResult > nodeAllocationResults ;
164
+ if (isRelocation ) {
165
+ MoveDecision moveDecision = explanation .getShardAllocationDecision ().getMoveDecision ();
166
+ nodeAllocationResults = moveDecision .getNodeDecisions ();
167
+ } else {
168
+ AllocateUnassignedDecision allocateUnassignedDecision = explanation .getShardAllocationDecision ().getAllocateDecision ();
169
+ nodeAllocationResults = allocateUnassignedDecision .getNodeDecisions ();
170
+ }
171
+
172
+ for (NodeAllocationResult nodeAllocationResult : nodeAllocationResults ) {
173
+ if (nodeAllocationResult .getNode ().equals (targetNode )) {
174
+ for (Decision decision : nodeAllocationResult .getCanAllocateDecision ().getDecisions ()) {
175
+ if (decision .label ().equals (NAME )) {
176
+ requiredDecision = decision ;
177
+ break ;
178
+ }
179
+ }
180
+ }
181
+ }
182
+
183
+ assertNotNull (requiredDecision );
184
+ return requiredDecision ;
185
+ }
186
+
187
+ // get allocation and relocation decisions for all nodes
188
+ protected void prepareDecisions () {
189
+ internalCluster ().client ()
190
+ .admin ()
191
+ .indices ()
192
+ .prepareUpdateSettings (TEST_INDEX )
193
+ .setSettings (Settings .builder ().put ("index.routing.allocation.exclude._name" , allNodesExcept (null )))
194
+ .execute ()
195
+ .actionGet ();
196
+ }
197
+
198
+ protected void attemptAllocation (@ Nullable String targetNodeName ) {
199
+ Settings .Builder settingsBuilder ;
200
+ if (targetNodeName != null ) {
201
+ settingsBuilder = Settings .builder ()
202
+ .put ("index.routing.allocation.include._name" , targetNodeName )
203
+ .put ("index.routing.allocation.exclude._name" , allNodesExcept (targetNodeName ));
204
+ } else {
205
+ String clusterManagerNodeName = internalCluster ().client ()
206
+ .admin ()
207
+ .cluster ()
208
+ .prepareState ()
209
+ .execute ()
210
+ .actionGet ()
211
+ .getState ()
212
+ .getNodes ()
213
+ .getClusterManagerNode ()
214
+ .getName ();
215
+ // to allocate freely among all nodes other than cluster-manager node
216
+ settingsBuilder = Settings .builder ()
217
+ .put ("index.routing.allocation.include._name" , allNodesExcept (clusterManagerNodeName ))
218
+ .put ("index.routing.allocation.exclude._name" , clusterManagerNodeName );
219
+ }
220
+ internalCluster ().client ().admin ().indices ().prepareUpdateSettings (TEST_INDEX ).setSettings (settingsBuilder ).execute ().actionGet ();
221
+ }
222
+
223
+ // verify that shard does not exist at targetNode
224
+ protected void assertNonAllocation (boolean isPrimary ) {
225
+ if (isPrimary ) {
226
+ ensureRed (TEST_INDEX );
227
+ } else {
228
+ ensureYellowAndNoInitializingShards (TEST_INDEX );
229
+ }
230
+ ShardRouting shardRouting = getShardRouting (isPrimary );
231
+ assertFalse (shardRouting .active ());
232
+ assertNull (shardRouting .currentNodeId ());
233
+ assertEquals (ShardRoutingState .UNASSIGNED , shardRouting .state ());
234
+ }
235
+
236
+ // verify that shard exists at targetNode
237
+ protected void assertAllocation (boolean isPrimary , @ Nullable DiscoveryNode targetNode ) {
238
+ ShardRouting shardRouting = getShardRouting (isPrimary );
239
+ assertTrue (shardRouting .active ());
240
+ assertNotNull (shardRouting .currentNodeId ());
241
+ if (targetNode != null ) {
242
+ assertEquals (shardRouting .currentNodeId (), targetNode .getId ());
243
+ }
244
+ }
245
+
246
+ // verify that the created index is not remote store backed
247
+ protected void assertNonRemoteStoreBackedIndex (String indexName ) {
248
+ Settings indexSettings = internalCluster ().client ()
249
+ .admin ()
250
+ .indices ()
251
+ .prepareGetIndex ()
252
+ .execute ()
253
+ .actionGet ()
254
+ .getSettings ()
255
+ .get (indexName );
256
+ assertEquals (ReplicationType .DOCUMENT .toString (), indexSettings .get (SETTING_REPLICATION_TYPE ));
257
+ assertNull (indexSettings .get (SETTING_REMOTE_STORE_ENABLED ));
258
+ assertNull (indexSettings .get (SETTING_REMOTE_SEGMENT_STORE_REPOSITORY ));
259
+ assertNull (indexSettings .get (SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY ));
260
+ }
261
+
262
+ // verify that the created index is remote store backed
263
+ protected void assertRemoteStoreBackedIndex (String indexName ) {
264
+ Settings indexSettings = internalCluster ().client ()
265
+ .admin ()
266
+ .indices ()
267
+ .prepareGetIndex ()
268
+ .execute ()
269
+ .actionGet ()
270
+ .getSettings ()
271
+ .get (indexName );
272
+ assertEquals (ReplicationType .SEGMENT .toString (), indexSettings .get (SETTING_REPLICATION_TYPE ));
273
+ assertEquals ("true" , indexSettings .get (SETTING_REMOTE_STORE_ENABLED ));
274
+ assertEquals (REPOSITORY_NAME , indexSettings .get (SETTING_REMOTE_SEGMENT_STORE_REPOSITORY ));
275
+ assertEquals (REPOSITORY_2_NAME , indexSettings .get (SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY ));
276
+ assertEquals (
277
+ IndexSettings .DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL ,
278
+ INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING .get (indexSettings )
279
+ );
280
+ }
281
+
101
282
}
0 commit comments