@@ -1327,7 +1327,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
1327
1327
1328
1328
private final Set <RepositoryShardId > currentlyCloning = Collections .synchronizedSet (new HashSet <>());
1329
1329
1330
- private void runReadyClone (
1330
+ // Made to package private to be able to test the method in UTs
1331
+ void runReadyClone (
1331
1332
Snapshot target ,
1332
1333
SnapshotId sourceSnapshot ,
1333
1334
ShardSnapshotStatus shardStatusBefore ,
@@ -1351,69 +1352,75 @@ public void onFailure(Exception e) {
1351
1352
@ Override
1352
1353
protected void doRun () {
1353
1354
final String localNodeId = clusterService .localNode ().getId ();
1354
- repository .getRepositoryData (ActionListener .wrap (repositoryData -> {
1355
- try {
1356
- final IndexMetadata indexMetadata = repository .getSnapshotIndexMetaData (
1357
- repositoryData ,
1355
+ if (remoteStoreIndexShallowCopy == false ) {
1356
+ executeClone (localNodeId , false );
1357
+ } else {
1358
+ repository .getRepositoryData (ActionListener .wrap (repositoryData -> {
1359
+ try {
1360
+ final IndexMetadata indexMetadata = repository .getSnapshotIndexMetaData (
1361
+ repositoryData ,
1362
+ sourceSnapshot ,
1363
+ repoShardId .index ()
1364
+ );
1365
+ final boolean cloneRemoteStoreIndexShardSnapshot = indexMetadata .getSettings ()
1366
+ .getAsBoolean (IndexMetadata .SETTING_REMOTE_STORE_ENABLED , false );
1367
+ executeClone (localNodeId , cloneRemoteStoreIndexShardSnapshot );
1368
+ } catch (IOException e ) {
1369
+ logger .warn ("Failed to get index-metadata from repository data for index [{}]" , repoShardId .index ().getName ());
1370
+ failCloneShardAndUpdateClusterState (target , sourceSnapshot , repoShardId );
1371
+ }
1372
+ }, this ::onFailure ));
1373
+ }
1374
+ }
1375
+
1376
+ private void executeClone (String localNodeId , boolean cloneRemoteStoreIndexShardSnapshot ) {
1377
+ if (currentlyCloning .add (repoShardId )) {
1378
+ if (cloneRemoteStoreIndexShardSnapshot ) {
1379
+ repository .cloneRemoteStoreIndexShardSnapshot (
1358
1380
sourceSnapshot ,
1359
- repoShardId .index ()
1381
+ target .getSnapshotId (),
1382
+ repoShardId ,
1383
+ shardStatusBefore .generation (),
1384
+ remoteStoreLockManagerFactory ,
1385
+ getCloneCompletionListener (localNodeId )
1360
1386
);
1361
- final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy
1362
- && indexMetadata .getSettings ().getAsBoolean (IndexMetadata .SETTING_REMOTE_STORE_ENABLED , false );
1363
- final SnapshotId targetSnapshot = target .getSnapshotId ();
1364
- final ActionListener <String > listener = ActionListener .wrap (
1365
- generation -> innerUpdateSnapshotState (
1366
- new ShardSnapshotUpdate (
1367
- target ,
1368
- repoShardId ,
1369
- new ShardSnapshotStatus (localNodeId , ShardState .SUCCESS , generation )
1370
- ),
1371
- ActionListener .runBefore (
1372
- ActionListener .wrap (
1373
- v -> logger .trace (
1374
- "Marked [{}] as successfully cloned from [{}] to [{}]" ,
1375
- repoShardId ,
1376
- sourceSnapshot ,
1377
- targetSnapshot
1378
- ),
1379
- e -> {
1380
- logger .warn ("Cluster state update after successful shard clone [{}] failed" , repoShardId );
1381
- failAllListenersOnMasterFailOver (e );
1382
- }
1383
- ),
1384
- () -> currentlyCloning .remove (repoShardId )
1385
- )
1386
- ),
1387
- e -> {
1388
- logger .warn ("Exception [{}] while trying to clone shard [{}]" , e , repoShardId );
1389
- failCloneShardAndUpdateClusterState (target , sourceSnapshot , repoShardId );
1390
- }
1387
+ } else {
1388
+ repository .cloneShardSnapshot (
1389
+ sourceSnapshot ,
1390
+ target .getSnapshotId (),
1391
+ repoShardId ,
1392
+ shardStatusBefore .generation (),
1393
+ getCloneCompletionListener (localNodeId )
1391
1394
);
1392
- if (currentlyCloning .add (repoShardId )) {
1393
- if (cloneRemoteStoreIndexShardSnapshot ) {
1394
- repository .cloneRemoteStoreIndexShardSnapshot (
1395
- sourceSnapshot ,
1396
- targetSnapshot ,
1395
+ }
1396
+ }
1397
+ }
1398
+
1399
+ private ActionListener <String > getCloneCompletionListener (String localNodeId ) {
1400
+ return ActionListener .wrap (
1401
+ generation -> innerUpdateSnapshotState (
1402
+ new ShardSnapshotUpdate (target , repoShardId , new ShardSnapshotStatus (localNodeId , ShardState .SUCCESS , generation )),
1403
+ ActionListener .runBefore (
1404
+ ActionListener .wrap (
1405
+ v -> logger .trace (
1406
+ "Marked [{}] as successfully cloned from [{}] to [{}]" ,
1397
1407
repoShardId ,
1398
- shardStatusBefore .generation (),
1399
- remoteStoreLockManagerFactory ,
1400
- listener
1401
- );
1402
- } else {
1403
- repository .cloneShardSnapshot (
1404
1408
sourceSnapshot ,
1405
- targetSnapshot ,
1406
- repoShardId ,
1407
- shardStatusBefore .generation (),
1408
- listener
1409
- );
1410
- }
1411
- }
1412
- } catch (IOException e ) {
1413
- logger .warn ("Failed to get index-metadata from repository data for index [{}]" , repoShardId .index ().getName ());
1409
+ target .getSnapshotId ()
1410
+ ),
1411
+ e -> {
1412
+ logger .warn ("Cluster state update after successful shard clone [{}] failed" , repoShardId );
1413
+ failAllListenersOnMasterFailOver (e );
1414
+ }
1415
+ ),
1416
+ () -> currentlyCloning .remove (repoShardId )
1417
+ )
1418
+ ),
1419
+ e -> {
1420
+ logger .warn ("Exception [{}] while trying to clone shard [{}]" , e , repoShardId );
1414
1421
failCloneShardAndUpdateClusterState (target , sourceSnapshot , repoShardId );
1415
1422
}
1416
- }, this :: onFailure ) );
1423
+ );
1417
1424
}
1418
1425
});
1419
1426
}
0 commit comments