1616import org .opensearch .common .SetOnce ;
1717import org .opensearch .common .blobstore .BlobMetadata ;
1818import org .opensearch .common .blobstore .BlobPath ;
19+ import org .opensearch .common .blobstore .FetchBlobResult ;
1920import org .opensearch .common .blobstore .stream .write .WritePriority ;
2021import org .opensearch .common .io .VersionedCodecStreamWrapper ;
2122import org .opensearch .common .io .stream .BytesStreamOutput ;
3637import java .nio .file .Files ;
3738import java .nio .file .Path ;
3839import java .util .ArrayList ;
40+ import java .util .Base64 ;
3941import java .util .HashMap ;
4042import java .util .HashSet ;
4143import java .util .List ;
@@ -63,6 +65,7 @@ public class TranslogTransferManager {
6365 private final RemoteTranslogTransferTracker remoteTranslogTransferTracker ;
6466 private final RemoteStoreSettings remoteStoreSettings ;
6567 private static final int METADATA_FILES_TO_FETCH = 10 ;
68+ boolean ckpAsMetadata ;
6669
6770 private final Logger logger ;
6871
@@ -79,7 +82,8 @@ public TranslogTransferManager(
7982 BlobPath remoteMetadataTransferPath ,
8083 FileTransferTracker fileTransferTracker ,
8184 RemoteTranslogTransferTracker remoteTranslogTransferTracker ,
82- RemoteStoreSettings remoteStoreSettings
85+ RemoteStoreSettings remoteStoreSettings ,
86+ boolean ckpAsMetadata
8387 ) {
8488 this .shardId = shardId ;
8589 this .transferService = transferService ;
@@ -89,6 +93,7 @@ public TranslogTransferManager(
8993 this .logger = Loggers .getLogger (getClass (), shardId );
9094 this .remoteTranslogTransferTracker = remoteTranslogTransferTracker ;
9195 this .remoteStoreSettings = remoteStoreSettings ;
96+ this .ckpAsMetadata = ckpAsMetadata ;
9297 }
9398
9499 public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker () {
@@ -110,8 +115,12 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
110115 long prevUploadTimeInMillis = remoteTranslogTransferTracker .getTotalUploadTimeInMillis ();
111116
112117 try {
113- toUpload .addAll (fileTransferTracker .exclusionFilter (transferSnapshot .getTranslogFileSnapshots ()));
114- toUpload .addAll (fileTransferTracker .exclusionFilter ((transferSnapshot .getCheckpointFileSnapshots ())));
118+ if (ckpAsMetadata ) {
119+ toUpload .addAll (fileTransferTracker .exclusionFilter (transferSnapshot .getTranslogFileWithMetadataSnapshots ()));
120+ } else {
121+ toUpload .addAll (fileTransferTracker .exclusionFilter (transferSnapshot .getTranslogFileSnapshots ()));
122+ toUpload .addAll (fileTransferTracker .exclusionFilter ((transferSnapshot .getCheckpointFileSnapshots ())));
123+ }
115124 if (toUpload .isEmpty ()) {
116125 logger .trace ("Nothing to upload for transfer" );
117126 return true ;
@@ -236,15 +245,101 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
236245 generation ,
237246 location
238247 );
239- // Download Checkpoint file from remote to local FS
240248 String ckpFileName = Translog .getCommitCheckpointFileName (Long .parseLong (generation ));
241- downloadToFS (ckpFileName , location , primaryTerm );
242- // Download translog file from remote to local FS
243249 String translogFilename = Translog .getFilename (Long .parseLong (generation ));
244- downloadToFS (translogFilename , location , primaryTerm );
250+ if (ckpAsMetadata == false ) {
251+ // Download Checkpoint file from remote to local FS
252+ downloadToFS (ckpFileName , location , primaryTerm );
253+ // Download translog file from remote to local FS
254+ downloadToFS (translogFilename , location , primaryTerm );
255+ } else {
256+ // Download translog.tlog file with object metadata from remote to local FS
257+ Map <String , String > metadata = downloadTranslogToFSAndGetMetadata (translogFilename , location , primaryTerm , generation );
258+ try {
259+ assert metadata != null && !metadata .isEmpty () && metadata .containsKey ("ckp-data" );
260+ recoverCkpFileFromMetadata (metadata , location , generation , translogFilename );
261+ } catch (Exception e ) {
262+ throw new IOException ("Failed to recover checkpoint file from remote" , e );
263+ }
264+ }
245265 return true ;
246266 }
247267
268+ private Map <String , String > downloadTranslogToFSAndGetMetadata (String fileName , Path location , String primaryTerm , String generation )
269+ throws IOException {
270+ Path filePath = location .resolve (fileName );
271+ // Here, we always override the existing file if present.
272+ // We need to change this logic when we introduce incremental download
273+ deleteFileIfExists (filePath );
274+
275+ boolean downloadStatus = false ;
276+ long bytesToRead = 0 , downloadStartTime = System .nanoTime ();
277+ Map <String , String > metadata ;
278+
279+ FetchBlobResult inputStreamWithMetadata = transferService .downloadBlobWithMetadata (
280+ remoteDataTransferPath .add (primaryTerm ),
281+ fileName
282+ );
283+ try {
284+ InputStream inputStream = inputStreamWithMetadata .getInputStream ();
285+ metadata = inputStreamWithMetadata .getMetadata ();
286+
287+ bytesToRead = inputStream .available ();
288+ Files .copy (inputStream , filePath );
289+ downloadStatus = true ;
290+
291+ } finally {
292+ remoteTranslogTransferTracker .addDownloadTimeInMillis ((System .nanoTime () - downloadStartTime ) / 1_000_000L );
293+ if (downloadStatus ) {
294+ remoteTranslogTransferTracker .addDownloadBytesSucceeded (bytesToRead );
295+ }
296+ }
297+
298+ // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
299+ fileTransferTracker .add (fileName , true );
300+
301+ return metadata ;
302+ }
303+
304+ /**
305+ * Process the provided metadata and tries to write the content of the checkpoint (ckp) file to the FS.
306+ */
307+ private void recoverCkpFileFromMetadata (Map <String , String > metadata , Path location , String generation , String fileName )
308+ throws IOException {
309+
310+ boolean downloadStatus = false ;
311+ long bytesToRead = 0 ;
312+ try {
313+ String ckpFileName = Translog .getCommitCheckpointFileName (Long .parseLong (generation ));
314+ Path filePath = location .resolve (ckpFileName );
315+ // Here, we always override the existing file if present.
316+ deleteFileIfExists (filePath );
317+
318+ String ckpDataBase64 = metadata .get ("ckp-data" );
319+ if (ckpDataBase64 == null ) {
320+ logger .error ("Error processing metadata for translog file: {}" , fileName );
321+ throw new IllegalStateException (
322+ "Checkpoint file data (key - ckp-data) is expected but not found in metadata for file: " + fileName
323+ );
324+ }
325+ byte [] ckpFileBytes = Base64 .getDecoder ().decode (ckpDataBase64 );
326+ bytesToRead = ckpFileBytes .length ;
327+
328+ Files .write (filePath , ckpFileBytes );
329+ downloadStatus = true ;
330+ } finally {
331+ if (downloadStatus ) {
332+ remoteTranslogTransferTracker .addDownloadBytesSucceeded (bytesToRead );
333+ }
334+ }
335+ }
336+
337+ public void deleteFileIfExists (Path filePath ) throws IOException {
338+ if (Files .exists (filePath )) {
339+ Files .delete (filePath );
340+ }
341+ }
342+
248343 private void downloadToFS (String fileName , Path location , String primaryTerm ) throws IOException {
249344 Path filePath = location .resolve (fileName );
250345 // Here, we always override the existing file if present.
@@ -391,7 +486,11 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
391486 // Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
392487 String ckpFileName = Translog .getCommitCheckpointFileName (generation );
393488 String translogFileName = Translog .getFilename (generation );
394- translogFiles .addAll (List .of (ckpFileName , translogFileName ));
489+ if (ckpAsMetadata == false ) {
490+ translogFiles .addAll (List .of (ckpFileName , translogFileName ));
491+ } else {
492+ translogFiles .add (translogFileName );
493+ }
395494 });
396495 // Delete the translog and checkpoint files asynchronously
397496 deleteTranslogFilesAsync (primaryTerm , translogFiles , onCompletion );
0 commit comments