Skip to content

Commit a42c6b8

Browse files
sachinpkaleSachin Kale
authored andcommitted
Integrate translog cleanup with snapshot deletion and fix primary term deletion logic (opensearch-project#15657)
--------- Signed-off-by: Sachin Kale <kalsac@amazon.com> Co-authored-by: Sachin Kale <kalsac@amazon.com>
1 parent 465bed7 commit a42c6b8

File tree

11 files changed

+422
-45
lines changed

11 files changed

+422
-45
lines changed

server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotITV2.java renamed to server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import static org.hamcrest.Matchers.lessThan;
3333

3434
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
35-
public class DeleteSnapshotITV2 extends AbstractSnapshotIntegTestCase {
35+
public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase {
3636

3737
private static final String REMOTE_REPO_NAME = "remote-store-repo-name";
3838

@@ -276,9 +276,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
276276
Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
277277
Path shardPath = Path.of(String.valueOf(indexPath), "0");
278278
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
279+
Path translogPath = Path.of(String.valueOf(shardPath), "translog");
279280

280281
// Get total segments remote store directory file count for deleted index and shard 0
281282
int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);
283+
int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath);
282284

283285
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
284286

@@ -312,6 +314,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
312314
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1));
313315
} catch (Exception e) {}
314316
}, 60, TimeUnit.SECONDS);
317+
318+
assertBusy(() -> {
319+
try {
320+
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1));
321+
} catch (Exception e) {}
322+
}, 60, TimeUnit.SECONDS);
323+
315324
}
316325

317326
private Settings snapshotV2Settings(Path remoteStoreRepoPath) {

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ static long getGeneration(String[] filenameTokens) {
412412

413413
public static long getTimestamp(String filename) {
414414
String[] filenameTokens = filename.split(SEPARATOR);
415-
return RemoteStoreUtils.invertLong(filenameTokens[6]);
415+
return RemoteStoreUtils.invertLong(filenameTokens[filenameTokens.length - 2]);
416416
}
417417

418418
public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename) {

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
116116
}
117117
}
118118

119+
public Supplier<RepositoriesService> getRepositoriesService() {
120+
return this.repositoriesService;
121+
}
122+
119123
}

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

Lines changed: 158 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.translog;
1010

11+
import org.apache.logging.log4j.LogManager;
1112
import org.apache.logging.log4j.Logger;
1213
import org.opensearch.cluster.service.ClusterService;
1314
import org.opensearch.common.blobstore.BlobMetadata;
@@ -33,6 +34,7 @@
3334
import java.util.Optional;
3435
import java.util.Set;
3536
import java.util.TreeSet;
37+
import java.util.concurrent.atomic.AtomicLong;
3638
import java.util.function.BooleanSupplier;
3739
import java.util.function.LongConsumer;
3840
import java.util.function.LongSupplier;
@@ -52,10 +54,13 @@
5254
*/
5355
public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
5456

57+
private static Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslog.class);
5558
private final Logger logger;
5659
private final Map<Long, String> metadataFilePinnedTimestampMap;
5760
// For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads.
5861
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
62+
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
63+
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
5964

6065
public RemoteFsTimestampAwareTranslog(
6166
TranslogConfig config,
@@ -86,6 +91,7 @@ public RemoteFsTimestampAwareTranslog(
8691
logger = Loggers.getLogger(getClass(), shardId);
8792
this.metadataFilePinnedTimestampMap = new HashMap<>();
8893
this.oldFormatMetadataFileGenerationMap = new HashMap<>();
94+
this.oldFormatMetadataFilePrimaryTermMap = new HashMap<>();
8995
}
9096

9197
@Override
@@ -165,7 +171,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
165171
return;
166172
}
167173

168-
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles);
174+
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
175+
metadataFiles,
176+
metadataFilePinnedTimestampMap,
177+
logger
178+
);
169179

170180
// If index is not deleted, make sure to keep latest metadata file
171181
if (indexDeleted == false) {
@@ -209,7 +219,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
209219
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);
210220

211221
// Delete stale primary terms
212-
deleteStaleRemotePrimaryTerms(metadataFiles);
222+
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
213223
} else {
214224
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
215225
}
@@ -259,8 +269,16 @@ protected Set<Long> getGenerationsToBeDeleted(
259269
return generationsToBeDeleted;
260270
}
261271

262-
// Visible for testing
263272
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
273+
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger);
274+
}
275+
276+
// Visible for testing
277+
protected static List<String> getMetadataFilesToBeDeleted(
278+
List<String> metadataFiles,
279+
Map<Long, String> metadataFilePinnedTimestampMap,
280+
Logger logger
281+
) {
264282
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();
265283

266284
// Keep files since last successful run of scheduler
@@ -351,27 +369,153 @@ protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
351369
}
352370
}
353371

372+
private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
373+
deleteStaleRemotePrimaryTerms(
374+
metadataFiles,
375+
translogTransferManager,
376+
oldFormatMetadataFilePrimaryTermMap,
377+
minPrimaryTermInRemote,
378+
logger
379+
);
380+
}
381+
354382
/**
355383
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
356384
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
357385
* <br>
358386
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
359387
*/
360-
private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
388+
protected static void deleteStaleRemotePrimaryTerms(
389+
List<String> metadataFiles,
390+
TranslogTransferManager translogTransferManager,
391+
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
392+
AtomicLong minPrimaryTermInRemoteAtomicLong,
393+
Logger logger
394+
) {
361395
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
362396
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
363397
// of older primary term.
364-
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
365-
if (metadataFiles.isEmpty()) {
366-
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
367-
return;
398+
if (metadataFiles.isEmpty()) {
399+
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
400+
return;
401+
}
402+
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFiles.stream().map(file -> {
403+
try {
404+
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
405+
} catch (IOException e) {
406+
return Long.MAX_VALUE;
407+
}
408+
}).min(Long::compareTo);
409+
// First we delete all stale primary terms folders from remote store
410+
Long minPrimaryTermInRemote = getMinPrimaryTermInRemote(minPrimaryTermInRemoteAtomicLong, translogTransferManager, logger);
411+
if (minPrimaryTermFromMetadataFiles.get() > minPrimaryTermInRemote) {
412+
translogTransferManager.deletePrimaryTermsAsync(minPrimaryTermFromMetadataFiles.get());
413+
minPrimaryTermInRemoteAtomicLong.set(minPrimaryTermFromMetadataFiles.get());
414+
} else {
415+
logger.debug(
416+
"Skipping primary term cleanup. minimumReferencedPrimaryTerm = {}, minPrimaryTermInRemote = {}",
417+
minPrimaryTermFromMetadataFiles.get(),
418+
minPrimaryTermInRemote
419+
);
420+
}
421+
}
422+
423+
private static Long getMinPrimaryTermInRemote(
424+
AtomicLong minPrimaryTermInRemote,
425+
TranslogTransferManager translogTransferManager,
426+
Logger logger
427+
) {
428+
if (minPrimaryTermInRemote.get() == Long.MAX_VALUE) {
429+
try {
430+
Set<Long> primaryTermsInRemote = translogTransferManager.listPrimaryTermsInRemote();
431+
if (primaryTermsInRemote.isEmpty() == false) {
432+
Optional<Long> minPrimaryTerm = primaryTermsInRemote.stream().min(Long::compareTo);
433+
minPrimaryTerm.ifPresent(minPrimaryTermInRemote::set);
434+
}
435+
} catch (IOException e) {
436+
logger.error("Exception while listing primary terms in remote translog", e);
437+
}
438+
}
439+
return minPrimaryTermInRemote.get();
440+
}
441+
442+
protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
443+
String metadataFile,
444+
TranslogTransferManager translogTransferManager,
445+
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
446+
) throws IOException {
447+
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
448+
if (minMaxPrimaryTermFromFileName != null) {
449+
return minMaxPrimaryTermFromFileName;
450+
} else {
451+
if (oldFormatMetadataFilePrimaryTermMap.containsKey(metadataFile)) {
452+
return oldFormatMetadataFilePrimaryTermMap.get(metadataFile);
453+
} else {
454+
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile);
455+
long maxPrimaryTem = TranslogTransferMetadata.getPrimaryTermFromFileName(metadataFile);
456+
long minPrimaryTem = -1;
457+
if (metadata.getGenerationToPrimaryTermMapper() != null
458+
&& metadata.getGenerationToPrimaryTermMapper().values().isEmpty() == false) {
459+
Optional<Long> primaryTerm = metadata.getGenerationToPrimaryTermMapper()
460+
.values()
461+
.stream()
462+
.map(s -> Long.parseLong(s))
463+
.min(Long::compareTo);
464+
if (primaryTerm.isPresent()) {
465+
minPrimaryTem = primaryTerm.get();
466+
}
467+
}
468+
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
469+
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
470+
return minMaxPrimaryTermTuple;
368471
}
369-
Optional<Long> minPrimaryTerm = metadataFiles.stream()
370-
.map(file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[1]))
371-
.min(Long::compareTo);
372-
// First we delete all stale primary terms folders from remote store
373-
long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1;
374-
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
375472
}
376473
}
474+
475+
public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
476+
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
477+
@Override
478+
public void onResponse(List<BlobMetadata> blobMetadata) {
479+
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
480+
481+
try {
482+
if (metadataFiles.isEmpty()) {
483+
staticLogger.debug("No stale translog metadata files found");
484+
return;
485+
}
486+
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
487+
if (metadataFilesToBeDeleted.isEmpty()) {
488+
staticLogger.debug("No metadata files to delete");
489+
return;
490+
}
491+
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
492+
493+
// For all the files that we are keeping, fetch min and max generations
494+
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
495+
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
496+
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
497+
498+
// Delete stale metadata files
499+
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});
500+
501+
// Delete stale primary terms
502+
deleteStaleRemotePrimaryTerms(
503+
metadataFilesNotToBeDeleted,
504+
translogTransferManager,
505+
new HashMap<>(),
506+
new AtomicLong(Long.MAX_VALUE),
507+
staticLogger
508+
);
509+
} catch (Exception e) {
510+
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
511+
}
512+
}
513+
514+
@Override
515+
public void onFailure(Exception e) {
516+
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
517+
}
518+
};
519+
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
520+
}
377521
}

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,14 @@ public void onFailure(Exception e) {
545545
});
546546
}
547547

548+
public Set<Long> listPrimaryTermsInRemote() throws IOException {
549+
Set<String> primaryTermsStr = transferService.listFolders(remoteDataTransferPath);
550+
if (primaryTermsStr != null) {
551+
return primaryTermsStr.stream().map(Long::parseLong).collect(Collectors.toSet());
552+
}
553+
return new HashSet<>();
554+
}
555+
548556
/**
549557
* Handles deletion of all translog files associated with a primary term.
550558
*

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Arrays;
2020
import java.util.Map;
2121
import java.util.Objects;
22+
import java.util.Optional;
2223

2324
/**
2425
* The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the
@@ -108,11 +109,28 @@ public String getFileName() {
108109
RemoteStoreUtils.invertLong(createdAt),
109110
String.valueOf(Objects.hash(nodeId)),
110111
RemoteStoreUtils.invertLong(minTranslogGeneration),
112+
String.valueOf(getMinPrimaryTermReferred()),
111113
String.valueOf(CURRENT_VERSION)
112114
)
113115
);
114116
}
115117

118+
private long getMinPrimaryTermReferred() {
119+
if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) {
120+
return -1;
121+
}
122+
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get()
123+
.values()
124+
.stream()
125+
.map(s -> Long.parseLong(s))
126+
.min(Long::compareTo);
127+
if (minPrimaryTerm.isPresent()) {
128+
return minPrimaryTerm.get();
129+
} else {
130+
return -1;
131+
}
132+
}
133+
116134
public static Tuple<Tuple<Long, Long>, String> getNodeIdByPrimaryTermAndGeneration(String filename) {
117135
String[] tokens = filename.split(METADATA_SEPARATOR);
118136
if (tokens.length < 6) {
@@ -143,15 +161,43 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
143161
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
144162
try {
145163
// instead of direct index, we go backwards to avoid running into same separator in nodeId
146-
String minGeneration = tokens[tokens.length - 2];
164+
String minGeneration = tokens[tokens.length - 3];
147165
String maxGeneration = tokens[2];
148166
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
149-
} catch (NumberFormatException e) {
167+
} catch (Exception e) {
150168
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
151169
return null;
152170
}
153171
}
154172

173+
public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
174+
String[] tokens = filename.split(METADATA_SEPARATOR);
175+
if (tokens.length < 7) {
176+
// For versions < 2.17, we don't have min primary term.
177+
return null;
178+
}
179+
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
180+
try {
181+
// instead of direct index, we go backwards to avoid running into same separator in nodeId
182+
String minPrimaryTerm = tokens[tokens.length - 2];
183+
String maxPrimaryTerm = tokens[1];
184+
return new Tuple<>(Long.parseLong(minPrimaryTerm), RemoteStoreUtils.invertLong(maxPrimaryTerm));
185+
} catch (Exception e) {
186+
logger.error(() -> new ParameterizedMessage("Exception while getting min and max primary term from: {}", filename), e);
187+
return null;
188+
}
189+
}
190+
191+
public static long getPrimaryTermFromFileName(String filename) {
192+
String[] tokens = filename.split(METADATA_SEPARATOR);
193+
try {
194+
return RemoteStoreUtils.invertLong(tokens[1]);
195+
} catch (Exception e) {
196+
logger.error(() -> new ParameterizedMessage("Exception while getting max primary term from: {}", filename), e);
197+
return -1;
198+
}
199+
}
200+
155201
@Override
156202
public int hashCode() {
157203
return Objects.hash(primaryTerm, generation);

0 commit comments

Comments
 (0)