40
40
import org .opensearch .index .store .remote .metadata .RemoteSegmentMetadata ;
41
41
import org .opensearch .index .store .remote .metadata .RemoteSegmentMetadataHandler ;
42
42
import org .opensearch .indices .replication .checkpoint .ReplicationCheckpoint ;
43
+ import org .opensearch .node .remotestore .RemoteStorePinnedTimestampService ;
43
44
import org .opensearch .threadpool .ThreadPool ;
44
45
45
46
import java .io .FileNotFoundException ;
@@ -91,6 +92,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
91
92
92
93
private final RemoteStoreLockManager mdLockManager ;
93
94
95
+ private final Map <Long , String > metadataFilePinnedTimestampMap ;
96
+
94
97
private final ThreadPool threadPool ;
95
98
96
99
/**
@@ -132,6 +135,7 @@ public RemoteSegmentStoreDirectory(
132
135
this .remoteMetadataDirectory = remoteMetadataDirectory ;
133
136
this .mdLockManager = mdLockManager ;
134
137
this .threadPool = threadPool ;
138
+ this .metadataFilePinnedTimestampMap = new HashMap <>();
135
139
this .logger = Loggers .getLogger (getClass (), shardId );
136
140
init ();
137
141
}
@@ -176,6 +180,42 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
176
180
return remoteSegmentMetadata ;
177
181
}
178
182
183
+ /**
184
+ * Initializes the remote segment metadata to a specific timestamp.
185
+ *
186
+ * @param timestamp The timestamp to initialize the remote segment metadata to.
187
+ * @return The RemoteSegmentMetadata object corresponding to the specified timestamp, or null if no metadata file is found for that timestamp.
188
+ * @throws IOException If an I/O error occurs while reading the metadata file.
189
+ */
190
+ public RemoteSegmentMetadata initializeToSpecificTimestamp (long timestamp ) throws IOException {
191
+ List <String > metadataFiles = remoteMetadataDirectory .listFilesByPrefixInLexicographicOrder (
192
+ MetadataFilenameUtils .METADATA_PREFIX ,
193
+ Integer .MAX_VALUE
194
+ );
195
+ Set <String > lockedMetadataFiles = RemoteStoreUtils .getPinnedTimestampLockedFiles (
196
+ metadataFiles ,
197
+ Set .of (timestamp ),
198
+ MetadataFilenameUtils ::getTimestamp ,
199
+ MetadataFilenameUtils ::getNodeIdByPrimaryTermAndGen
200
+ );
201
+ if (lockedMetadataFiles .isEmpty ()) {
202
+ return null ;
203
+ }
204
+ if (lockedMetadataFiles .size () > 1 ) {
205
+ throw new IOException (
206
+ "Expected exactly one metadata file matching timestamp: " + timestamp + " but got " + lockedMetadataFiles
207
+ );
208
+ }
209
+ String metadataFile = lockedMetadataFiles .iterator ().next ();
210
+ RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile (metadataFile );
211
+ if (remoteSegmentMetadata != null ) {
212
+ this .segmentsUploadedToRemoteStore = new ConcurrentHashMap <>(remoteSegmentMetadata .getMetadata ());
213
+ } else {
214
+ this .segmentsUploadedToRemoteStore = new ConcurrentHashMap <>();
215
+ }
216
+ return remoteSegmentMetadata ;
217
+ }
218
+
179
219
/**
180
220
* Read the latest metadata file to get the list of segments uploaded to the remote segment store.
181
221
* We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit.
@@ -324,7 +364,8 @@ public static String getMetadataFilename(
324
364
long translogGeneration ,
325
365
long uploadCounter ,
326
366
int metadataVersion ,
327
- String nodeId
367
+ String nodeId ,
368
+ long creationTimestamp
328
369
) {
329
370
return String .join (
330
371
SEPARATOR ,
@@ -334,11 +375,30 @@ public static String getMetadataFilename(
334
375
RemoteStoreUtils .invertLong (translogGeneration ),
335
376
RemoteStoreUtils .invertLong (uploadCounter ),
336
377
String .valueOf (Objects .hash (nodeId )),
337
- RemoteStoreUtils .invertLong (System . currentTimeMillis () ),
378
+ RemoteStoreUtils .invertLong (creationTimestamp ),
338
379
String .valueOf (metadataVersion )
339
380
);
340
381
}
341
382
383
+ public static String getMetadataFilename (
384
+ long primaryTerm ,
385
+ long generation ,
386
+ long translogGeneration ,
387
+ long uploadCounter ,
388
+ int metadataVersion ,
389
+ String nodeId
390
+ ) {
391
+ return getMetadataFilename (
392
+ primaryTerm ,
393
+ generation ,
394
+ translogGeneration ,
395
+ uploadCounter ,
396
+ metadataVersion ,
397
+ nodeId ,
398
+ System .currentTimeMillis ()
399
+ );
400
+ }
401
+
342
402
// Visible for testing
343
403
static long getPrimaryTerm (String [] filenameTokens ) {
344
404
return RemoteStoreUtils .invertLong (filenameTokens [1 ]);
@@ -793,6 +853,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
793
853
);
794
854
return ;
795
855
}
856
+
796
857
List <String > sortedMetadataFileList = remoteMetadataDirectory .listFilesByPrefixInLexicographicOrder (
797
858
MetadataFilenameUtils .METADATA_PREFIX ,
798
859
Integer .MAX_VALUE
@@ -806,16 +867,44 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
806
867
return ;
807
868
}
808
869
809
- List <String > metadataFilesEligibleToDelete = new ArrayList <>(
810
- sortedMetadataFileList .subList (lastNMetadataFilesToKeep , sortedMetadataFileList .size ())
870
+ // Check last fetch status of pinned timestamps. If stale, return.
871
+ if (RemoteStoreUtils .isPinnedTimestampStateStale ()) {
872
+ logger .warn ("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale" );
873
+ return ;
874
+ }
875
+
876
+ Tuple <Long , Set <Long >> pinnedTimestampsState = RemoteStorePinnedTimestampService .getPinnedTimestamps ();
877
+
878
+ Set <String > implicitLockedFiles = RemoteStoreUtils .getPinnedTimestampLockedFiles (
879
+ sortedMetadataFileList ,
880
+ pinnedTimestampsState .v2 (),
881
+ metadataFilePinnedTimestampMap ,
882
+ MetadataFilenameUtils ::getTimestamp ,
883
+ MetadataFilenameUtils ::getNodeIdByPrimaryTermAndGen
811
884
);
812
- Set <String > allLockFiles ;
885
+ final Set <String > allLockFiles = new HashSet <>(implicitLockedFiles );
886
+
813
887
try {
814
- allLockFiles = ((RemoteStoreMetadataLockManager ) mdLockManager ).fetchLockedMetadataFiles (MetadataFilenameUtils .METADATA_PREFIX );
888
+ allLockFiles .addAll (
889
+ ((RemoteStoreMetadataLockManager ) mdLockManager ).fetchLockedMetadataFiles (MetadataFilenameUtils .METADATA_PREFIX )
890
+ );
815
891
} catch (Exception e ) {
816
892
logger .error ("Exception while fetching segment metadata lock files, skipping deleteStaleSegments" , e );
817
893
return ;
818
894
}
895
+
896
+ List <String > metadataFilesEligibleToDelete = new ArrayList <>(
897
+ sortedMetadataFileList .subList (lastNMetadataFilesToKeep , sortedMetadataFileList .size ())
898
+ );
899
+
900
+ // Along with last N files, we need to keep files since last successful run of scheduler
901
+ long lastSuccessfulFetchOfPinnedTimestamps = pinnedTimestampsState .v1 ();
902
+ metadataFilesEligibleToDelete = RemoteStoreUtils .filterOutMetadataFilesBasedOnAge (
903
+ metadataFilesEligibleToDelete ,
904
+ MetadataFilenameUtils ::getTimestamp ,
905
+ lastSuccessfulFetchOfPinnedTimestamps
906
+ );
907
+
819
908
List <String > metadataFilesToBeDeleted = metadataFilesEligibleToDelete .stream ()
820
909
.filter (metadataFile -> allLockFiles .contains (metadataFile ) == false )
821
910
.collect (Collectors .toList ());
0 commit comments