23
23
import org .opensearch .cluster .routing .RecoverySource ;
24
24
import org .opensearch .cluster .routing .allocation .command .MoveAllocationCommand ;
25
25
import org .opensearch .common .Priority ;
26
+ import org .opensearch .common .blobstore .BlobPath ;
26
27
import org .opensearch .common .settings .Settings ;
27
28
import org .opensearch .common .unit .TimeValue ;
28
29
import org .opensearch .common .util .concurrent .BufferedAsyncIOProcessor ;
58
59
59
60
import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
60
61
import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_SHARDS ;
62
+ import static org .opensearch .index .remote .RemoteStoreEnums .DataCategory .SEGMENTS ;
63
+ import static org .opensearch .index .remote .RemoteStoreEnums .DataCategory .TRANSLOG ;
64
+ import static org .opensearch .index .remote .RemoteStoreEnums .DataType .DATA ;
65
+ import static org .opensearch .index .remote .RemoteStoreEnums .DataType .METADATA ;
61
66
import static org .opensearch .indices .RemoteStoreSettings .CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING ;
67
+ import static org .opensearch .test .OpenSearchTestCase .getShardLevelBlobPath ;
62
68
import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
63
69
import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertHitCount ;
64
70
import static org .hamcrest .Matchers .comparesEqualTo ;
@@ -183,13 +189,9 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
183
189
createIndex (INDEX_NAME , remoteStoreIndexSettings (1 , 10000l , -1 ));
184
190
int numberOfIterations = randomIntBetween (5 , 15 );
185
191
indexData (numberOfIterations , true , INDEX_NAME );
186
- String indexUUID = client ().admin ()
187
- .indices ()
188
- .prepareGetSettings (INDEX_NAME )
189
- .get ()
190
- .getSetting (INDEX_NAME , IndexMetadata .SETTING_INDEX_UUID );
191
- Path indexPath = Path .of (String .valueOf (segmentRepoPath ), indexUUID , "/0/segments/metadata" );
192
-
192
+ String shardPath = getShardLevelBlobPath (client (), INDEX_NAME , BlobPath .cleanPath (), "0" , SEGMENTS , METADATA ).buildAsString ();
193
+ Path indexPath = Path .of (segmentRepoPath + "/" + shardPath );
194
+ ;
193
195
IndexShard indexShard = getIndexShard (dataNode , INDEX_NAME );
194
196
int lastNMetadataFilesToKeep = indexShard .getRemoteStoreSettings ().getMinRemoteSegmentMetadataFiles ();
195
197
// Delete is async.
@@ -213,12 +215,8 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
213
215
createIndex (INDEX_NAME , remoteStoreIndexSettings (1 , 10000l , -1 ));
214
216
int numberOfIterations = randomIntBetween (5 , 15 );
215
217
indexData (numberOfIterations , false , INDEX_NAME );
216
- String indexUUID = client ().admin ()
217
- .indices ()
218
- .prepareGetSettings (INDEX_NAME )
219
- .get ()
220
- .getSetting (INDEX_NAME , IndexMetadata .SETTING_INDEX_UUID );
221
- Path indexPath = Path .of (String .valueOf (segmentRepoPath ), indexUUID , "/0/segments/metadata" );
218
+ String shardPath = getShardLevelBlobPath (client (), INDEX_NAME , BlobPath .cleanPath (), "0" , SEGMENTS , METADATA ).buildAsString ();
219
+ Path indexPath = Path .of (segmentRepoPath + "/" + shardPath );
222
220
int actualFileCount = getFileCount (indexPath );
223
221
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
224
222
MatcherAssert .assertThat (actualFileCount , is (oneOf (numberOfIterations - 1 , numberOfIterations , numberOfIterations + 1 )));
@@ -232,12 +230,8 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
232
230
createIndex (INDEX_NAME , remoteStoreIndexSettings (1 , 10000l , -1 ));
233
231
int numberOfIterations = randomIntBetween (5 , 15 );
234
232
indexData (numberOfIterations , true , INDEX_NAME );
235
- String indexUUID = client ().admin ()
236
- .indices ()
237
- .prepareGetSettings (INDEX_NAME )
238
- .get ()
239
- .getSetting (INDEX_NAME , IndexMetadata .SETTING_INDEX_UUID );
240
- Path indexPath = Path .of (String .valueOf (segmentRepoPath ), indexUUID , "/0/segments/metadata" );
233
+ String shardPath = getShardLevelBlobPath (client (), INDEX_NAME , BlobPath .cleanPath (), "0" , SEGMENTS , METADATA ).buildAsString ();
234
+ Path indexPath = Path .of (segmentRepoPath + "/" + shardPath );
241
235
int actualFileCount = getFileCount (indexPath );
242
236
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
243
237
MatcherAssert .assertThat (actualFileCount , is (oneOf (4 )));
@@ -251,12 +245,9 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
251
245
createIndex (INDEX_NAME , remoteStoreIndexSettings (1 , 10000l , -1 ));
252
246
int numberOfIterations = randomIntBetween (12 , 18 );
253
247
indexData (numberOfIterations , true , INDEX_NAME );
254
- String indexUUID = client ().admin ()
255
- .indices ()
256
- .prepareGetSettings (INDEX_NAME )
257
- .get ()
258
- .getSetting (INDEX_NAME , IndexMetadata .SETTING_INDEX_UUID );
259
- Path indexPath = Path .of (String .valueOf (segmentRepoPath ), indexUUID , "/0/segments/metadata" );
248
+ String shardPath = getShardLevelBlobPath (client (), INDEX_NAME , BlobPath .cleanPath (), "0" , SEGMENTS , METADATA ).buildAsString ();
249
+ Path indexPath = Path .of (segmentRepoPath + "/" + shardPath );
250
+ ;
260
251
int actualFileCount = getFileCount (indexPath );
261
252
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
262
253
MatcherAssert .assertThat (actualFileCount , is (oneOf (numberOfIterations + 1 )));
@@ -590,12 +581,8 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
590
581
flushAndRefresh (INDEX_NAME );
591
582
592
583
// 3. Delete data from remote segment store
593
- String indexUUID = client ().admin ()
594
- .indices ()
595
- .prepareGetSettings (INDEX_NAME )
596
- .get ()
597
- .getSetting (INDEX_NAME , IndexMetadata .SETTING_INDEX_UUID );
598
- Path segmentDataPath = Path .of (String .valueOf (segmentRepoPath ), indexUUID , "/0/segments/data" );
584
+ String shardPath = getShardLevelBlobPath (client (), INDEX_NAME , BlobPath .cleanPath (), "0" , SEGMENTS , DATA ).buildAsString ();
585
+ Path segmentDataPath = Path .of (segmentRepoPath + "/" + shardPath );
599
586
600
587
try (Stream <Path > files = Files .list (segmentDataPath )) {
601
588
files .forEach (p -> {
@@ -850,7 +837,8 @@ public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
850
837
.get ()
851
838
.getSetting (INDEX_NAME , IndexMetadata .SETTING_INDEX_UUID );
852
839
853
- Path translogMetaDataPath = Path .of (String .valueOf (translogRepoPath ), indexUUID , "/0/translog/metadata" );
840
+ String shardPath = getShardLevelBlobPath (client (), INDEX_NAME , BlobPath .cleanPath (), "0" , TRANSLOG , METADATA ).buildAsString ();
841
+ Path translogMetaDataPath = Path .of (translogRepoPath + "/" + shardPath );
854
842
855
843
try (Stream <Path > files = Files .list (translogMetaDataPath )) {
856
844
files .forEach (p -> {
0 commit comments