4040import org .opensearch .index .store .remote .metadata .RemoteSegmentMetadata ;
4141import org .opensearch .index .store .remote .metadata .RemoteSegmentMetadataHandler ;
4242import org .opensearch .indices .replication .checkpoint .ReplicationCheckpoint ;
43+ import org .opensearch .node .remotestore .RemoteStorePinnedTimestampService ;
4344import org .opensearch .threadpool .ThreadPool ;
4445
4546import java .io .FileNotFoundException ;
@@ -91,6 +92,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
9192
9293 private final RemoteStoreLockManager mdLockManager ;
9394
95+ private final Map <Long , String > metadataFilePinnedTimestampMap ;
96+
9497 private final ThreadPool threadPool ;
9598
9699 /**
@@ -132,6 +135,7 @@ public RemoteSegmentStoreDirectory(
132135 this .remoteMetadataDirectory = remoteMetadataDirectory ;
133136 this .mdLockManager = mdLockManager ;
134137 this .threadPool = threadPool ;
138+ this .metadataFilePinnedTimestampMap = new HashMap <>();
135139 this .logger = Loggers .getLogger (getClass (), shardId );
136140 init ();
137141 }
@@ -176,6 +180,42 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
176180 return remoteSegmentMetadata ;
177181 }
178182
183+ /**
184+ * Initializes the remote segment metadata to a specific timestamp.
185+ *
186+ * @param timestamp The timestamp to initialize the remote segment metadata to.
187+ * @return The RemoteSegmentMetadata object corresponding to the specified timestamp, or null if no metadata file is found for that timestamp.
188+ * @throws IOException If an I/O error occurs while reading the metadata file.
189+ */
190+ public RemoteSegmentMetadata initializeToSpecificTimestamp (long timestamp ) throws IOException {
191+ List <String > metadataFiles = remoteMetadataDirectory .listFilesByPrefixInLexicographicOrder (
192+ MetadataFilenameUtils .METADATA_PREFIX ,
193+ Integer .MAX_VALUE
194+ );
195+ Set <String > lockedMetadataFiles = RemoteStoreUtils .getPinnedTimestampLockedFiles (
196+ metadataFiles ,
197+ Set .of (timestamp ),
198+ MetadataFilenameUtils ::getTimestamp ,
199+ MetadataFilenameUtils ::getNodeIdByPrimaryTermAndGen
200+ );
201+ if (lockedMetadataFiles .isEmpty ()) {
202+ return null ;
203+ }
204+ if (lockedMetadataFiles .size () > 1 ) {
205+ throw new IOException (
206+ "Expected exactly one metadata file matching timestamp: " + timestamp + " but got " + lockedMetadataFiles
207+ );
208+ }
209+ String metadataFile = lockedMetadataFiles .iterator ().next ();
210+ RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile (metadataFile );
211+ if (remoteSegmentMetadata != null ) {
212+ this .segmentsUploadedToRemoteStore = new ConcurrentHashMap <>(remoteSegmentMetadata .getMetadata ());
213+ } else {
214+ this .segmentsUploadedToRemoteStore = new ConcurrentHashMap <>();
215+ }
216+ return remoteSegmentMetadata ;
217+ }
218+
179219 /**
180220 * Read the latest metadata file to get the list of segments uploaded to the remote segment store.
181221 * We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit.
@@ -324,7 +364,8 @@ public static String getMetadataFilename(
324364 long translogGeneration ,
325365 long uploadCounter ,
326366 int metadataVersion ,
327- String nodeId
367+ String nodeId ,
368+ long creationTimestamp
328369 ) {
329370 return String .join (
330371 SEPARATOR ,
@@ -334,11 +375,30 @@ public static String getMetadataFilename(
334375 RemoteStoreUtils .invertLong (translogGeneration ),
335376 RemoteStoreUtils .invertLong (uploadCounter ),
336377 String .valueOf (Objects .hash (nodeId )),
337- RemoteStoreUtils .invertLong (System . currentTimeMillis () ),
378+ RemoteStoreUtils .invertLong (creationTimestamp ),
338379 String .valueOf (metadataVersion )
339380 );
340381 }
341382
383+ public static String getMetadataFilename (
384+ long primaryTerm ,
385+ long generation ,
386+ long translogGeneration ,
387+ long uploadCounter ,
388+ int metadataVersion ,
389+ String nodeId
390+ ) {
391+ return getMetadataFilename (
392+ primaryTerm ,
393+ generation ,
394+ translogGeneration ,
395+ uploadCounter ,
396+ metadataVersion ,
397+ nodeId ,
398+ System .currentTimeMillis ()
399+ );
400+ }
401+
342402 // Visible for testing
343403 static long getPrimaryTerm (String [] filenameTokens ) {
344404 return RemoteStoreUtils .invertLong (filenameTokens [1 ]);
@@ -778,6 +838,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
778838 );
779839 return ;
780840 }
841+
781842 List <String > sortedMetadataFileList = remoteMetadataDirectory .listFilesByPrefixInLexicographicOrder (
782843 MetadataFilenameUtils .METADATA_PREFIX ,
783844 Integer .MAX_VALUE
@@ -791,16 +852,44 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
791852 return ;
792853 }
793854
794- List <String > metadataFilesEligibleToDelete = new ArrayList <>(
795- sortedMetadataFileList .subList (lastNMetadataFilesToKeep , sortedMetadataFileList .size ())
855+ // Check last fetch status of pinned timestamps. If stale, return.
856+ if (RemoteStoreUtils .isPinnedTimestampStateStale ()) {
857+ logger .warn ("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale" );
858+ return ;
859+ }
860+
861+ Tuple <Long , Set <Long >> pinnedTimestampsState = RemoteStorePinnedTimestampService .getPinnedTimestamps ();
862+
863+ Set <String > implicitLockedFiles = RemoteStoreUtils .getPinnedTimestampLockedFiles (
864+ sortedMetadataFileList ,
865+ pinnedTimestampsState .v2 (),
866+ metadataFilePinnedTimestampMap ,
867+ MetadataFilenameUtils ::getTimestamp ,
868+ MetadataFilenameUtils ::getNodeIdByPrimaryTermAndGen
796869 );
797- Set <String > allLockFiles ;
870+ final Set <String > allLockFiles = new HashSet <>(implicitLockedFiles );
871+
798872 try {
799- allLockFiles = ((RemoteStoreMetadataLockManager ) mdLockManager ).fetchLockedMetadataFiles (MetadataFilenameUtils .METADATA_PREFIX );
873+ allLockFiles .addAll (
874+ ((RemoteStoreMetadataLockManager ) mdLockManager ).fetchLockedMetadataFiles (MetadataFilenameUtils .METADATA_PREFIX )
875+ );
800876 } catch (Exception e ) {
801877 logger .error ("Exception while fetching segment metadata lock files, skipping deleteStaleSegments" , e );
802878 return ;
803879 }
880+
881+ List <String > metadataFilesEligibleToDelete = new ArrayList <>(
882+ sortedMetadataFileList .subList (lastNMetadataFilesToKeep , sortedMetadataFileList .size ())
883+ );
884+
885+ // Along with last N files, we need to keep files since last successful run of scheduler
886+ long lastSuccessfulFetchOfPinnedTimestamps = pinnedTimestampsState .v1 ();
887+ metadataFilesEligibleToDelete = RemoteStoreUtils .filterOutMetadataFilesBasedOnAge (
888+ metadataFilesEligibleToDelete ,
889+ MetadataFilenameUtils ::getTimestamp ,
890+ lastSuccessfulFetchOfPinnedTimestamps
891+ );
892+
804893 List <String > metadataFilesToBeDeleted = metadataFilesEligibleToDelete .stream ()
805894 .filter (metadataFile -> allLockFiles .contains (metadataFile ) == false )
806895 .collect (Collectors .toList ());
0 commit comments