65
65
import org .opensearch .cluster .service .ClusterService ;
66
66
import org .opensearch .common .Nullable ;
67
67
import org .opensearch .common .Numbers ;
68
+ import org .opensearch .common .Priority ;
68
69
import org .opensearch .common .SetOnce ;
69
70
import org .opensearch .common .UUIDs ;
70
71
import org .opensearch .common .blobstore .BlobContainer ;
@@ -267,6 +268,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
267
268
268
269
public static final Setting <Boolean > REMOTE_STORE_INDEX_SHALLOW_COPY = Setting .boolSetting ("remote_store_index_shallow_copy" , false );
269
270
271
+ public static final Setting <Boolean > SHALLOW_SNAPSHOT_V2 = Setting .boolSetting ("shallow_snapshot_v2" , false );
272
+
270
273
/**
271
274
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
272
275
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
@@ -1072,6 +1075,7 @@ private void doDeleteShardSnapshots(
1072
1075
repositoryStateId ,
1073
1076
repoMetaVersion ,
1074
1077
Function .identity (),
1078
+ Priority .NORMAL ,
1075
1079
ActionListener .wrap (writeUpdatedRepoDataStep ::onResponse , listener ::onFailure )
1076
1080
);
1077
1081
}, listener ::onFailure );
@@ -1101,39 +1105,46 @@ private void doDeleteShardSnapshots(
1101
1105
} else {
1102
1106
// Write the new repository data first (with the removed snapshot), using no shard generations
1103
1107
final RepositoryData updatedRepoData = repositoryData .removeSnapshots (snapshotIds , ShardGenerations .EMPTY );
1104
- writeIndexGen (updatedRepoData , repositoryStateId , repoMetaVersion , Function .identity (), ActionListener .wrap (newRepoData -> {
1105
- // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1106
- final ActionListener <Void > afterCleanupsListener = new GroupedActionListener <>(
1107
- ActionListener .wrap (() -> listener .onResponse (newRepoData )),
1108
- 2
1109
- );
1110
- cleanupUnlinkedRootAndIndicesBlobs (
1111
- snapshotIds ,
1112
- foundIndices ,
1113
- rootBlobs ,
1114
- newRepoData ,
1115
- remoteStoreLockManagerFactory ,
1116
- afterCleanupsListener
1117
- );
1118
- final StepListener <Collection <ShardSnapshotMetaDeleteResult >> writeMetaAndComputeDeletesStep = new StepListener <>();
1119
- writeUpdatedShardMetaDataAndComputeDeletes (
1120
- snapshotIds ,
1121
- repositoryData ,
1122
- false ,
1123
- remoteStoreLockManagerFactory ,
1124
- writeMetaAndComputeDeletesStep
1125
- );
1126
- writeMetaAndComputeDeletesStep .whenComplete (
1127
- deleteResults -> asyncCleanupUnlinkedShardLevelBlobs (
1128
- repositoryData ,
1108
+ writeIndexGen (
1109
+ updatedRepoData ,
1110
+ repositoryStateId ,
1111
+ repoMetaVersion ,
1112
+ Function .identity (),
1113
+ Priority .NORMAL ,
1114
+ ActionListener .wrap (newRepoData -> {
1115
+ // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1116
+ final ActionListener <Void > afterCleanupsListener = new GroupedActionListener <>(
1117
+ ActionListener .wrap (() -> listener .onResponse (newRepoData )),
1118
+ 2
1119
+ );
1120
+ cleanupUnlinkedRootAndIndicesBlobs (
1129
1121
snapshotIds ,
1130
- deleteResults ,
1122
+ foundIndices ,
1123
+ rootBlobs ,
1124
+ newRepoData ,
1131
1125
remoteStoreLockManagerFactory ,
1132
1126
afterCleanupsListener
1133
- ),
1134
- afterCleanupsListener ::onFailure
1135
- );
1136
- }, listener ::onFailure ));
1127
+ );
1128
+ final StepListener <Collection <ShardSnapshotMetaDeleteResult >> writeMetaAndComputeDeletesStep = new StepListener <>();
1129
+ writeUpdatedShardMetaDataAndComputeDeletes (
1130
+ snapshotIds ,
1131
+ repositoryData ,
1132
+ false ,
1133
+ remoteStoreLockManagerFactory ,
1134
+ writeMetaAndComputeDeletesStep
1135
+ );
1136
+ writeMetaAndComputeDeletesStep .whenComplete (
1137
+ deleteResults -> asyncCleanupUnlinkedShardLevelBlobs (
1138
+ repositoryData ,
1139
+ snapshotIds ,
1140
+ deleteResults ,
1141
+ remoteStoreLockManagerFactory ,
1142
+ afterCleanupsListener
1143
+ ),
1144
+ afterCleanupsListener ::onFailure
1145
+ );
1146
+ }, listener ::onFailure )
1147
+ );
1137
1148
}
1138
1149
}
1139
1150
@@ -1583,6 +1594,7 @@ public void cleanup(
1583
1594
repositoryStateId ,
1584
1595
repositoryMetaVersion ,
1585
1596
Function .identity (),
1597
+ Priority .NORMAL ,
1586
1598
ActionListener .wrap (
1587
1599
v -> cleanupStaleBlobs (
1588
1600
Collections .emptyList (),
@@ -1786,6 +1798,7 @@ public void finalizeSnapshot(
1786
1798
SnapshotInfo snapshotInfo ,
1787
1799
Version repositoryMetaVersion ,
1788
1800
Function <ClusterState , ClusterState > stateTransformer ,
1801
+ Priority repositoryUpdatePriority ,
1789
1802
final ActionListener <RepositoryData > listener
1790
1803
) {
1791
1804
assert repositoryStateId > RepositoryData .UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received ["
@@ -1834,6 +1847,7 @@ public void finalizeSnapshot(
1834
1847
repositoryStateId ,
1835
1848
repositoryMetaVersion ,
1836
1849
stateTransformer ,
1850
+ repositoryUpdatePriority ,
1837
1851
ActionListener .wrap (newRepoData -> {
1838
1852
if (writeShardGens ) {
1839
1853
cleanupOldShardGens (existingRepositoryData , updatedRepositoryData );
@@ -2367,17 +2381,19 @@ public boolean isSystemRepository() {
2367
2381
* Lastly, the {@link RepositoryMetadata} entry for this repository is updated to the new generation {@code P + 1} and thus
2368
2382
* pending and safe generation are set to the same value marking the end of the update of the repository data.
2369
2383
*
2370
- * @param repositoryData RepositoryData to write
2371
- * @param expectedGen expected repository generation at the start of the operation
2372
- * @param version version of the repository metadata to write
2373
- * @param stateFilter filter for the last cluster state update executed by this method
2384
+ * @param repositoryData RepositoryData to write
2385
+ * @param expectedGen expected repository generation at the start of the operation
2386
+ * @param version version of the repository metadata to write
2387
+ * @param stateFilter filter for the last cluster state update executed by this method
2388
+ * @param repositoryUpdatePriority priority for the cluster state update task
2374
2389
* @param listener completion listener
2375
2390
*/
2376
2391
protected void writeIndexGen (
2377
2392
RepositoryData repositoryData ,
2378
2393
long expectedGen ,
2379
2394
Version version ,
2380
2395
Function <ClusterState , ClusterState > stateFilter ,
2396
+ Priority repositoryUpdatePriority ,
2381
2397
ActionListener <RepositoryData > listener
2382
2398
) {
2383
2399
assert isReadOnly () == false ; // can not write to a read only repository
@@ -2402,7 +2418,7 @@ protected void writeIndexGen(
2402
2418
final StepListener <Long > setPendingStep = new StepListener <>();
2403
2419
clusterService .submitStateUpdateTask (
2404
2420
"set pending repository generation [" + metadata .name () + "][" + expectedGen + "]" ,
2405
- new ClusterStateUpdateTask () {
2421
+ new ClusterStateUpdateTask (repositoryUpdatePriority ) {
2406
2422
2407
2423
private long newGen ;
2408
2424
@@ -2540,7 +2556,7 @@ public void onFailure(Exception e) {
2540
2556
// Step 3: Update CS to reflect new repository generation.
2541
2557
clusterService .submitStateUpdateTask (
2542
2558
"set safe repository generation [" + metadata .name () + "][" + newGen + "]" ,
2543
- new ClusterStateUpdateTask () {
2559
+ new ClusterStateUpdateTask (repositoryUpdatePriority ) {
2544
2560
@ Override
2545
2561
public ClusterState execute (ClusterState currentState ) {
2546
2562
final RepositoryMetadata meta = getRepoMetadata (currentState );
0 commit comments