@@ -1327,7 +1327,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
13271327
13281328 private final Set <RepositoryShardId > currentlyCloning = Collections .synchronizedSet (new HashSet <>());
13291329
1330- private void runReadyClone (
1330+ // Made to package private to be able to test the method in UTs
1331+ void runReadyClone (
13311332 Snapshot target ,
13321333 SnapshotId sourceSnapshot ,
13331334 ShardSnapshotStatus shardStatusBefore ,
@@ -1351,69 +1352,75 @@ public void onFailure(Exception e) {
13511352 @ Override
13521353 protected void doRun () {
13531354 final String localNodeId = clusterService .localNode ().getId ();
1354- repository .getRepositoryData (ActionListener .wrap (repositoryData -> {
1355- try {
1356- final IndexMetadata indexMetadata = repository .getSnapshotIndexMetaData (
1357- repositoryData ,
1355+ if (remoteStoreIndexShallowCopy == false ) {
1356+ executeClone (localNodeId , false );
1357+ } else {
1358+ repository .getRepositoryData (ActionListener .wrap (repositoryData -> {
1359+ try {
1360+ final IndexMetadata indexMetadata = repository .getSnapshotIndexMetaData (
1361+ repositoryData ,
1362+ sourceSnapshot ,
1363+ repoShardId .index ()
1364+ );
1365+ final boolean cloneRemoteStoreIndexShardSnapshot = indexMetadata .getSettings ()
1366+ .getAsBoolean (IndexMetadata .SETTING_REMOTE_STORE_ENABLED , false );
1367+ executeClone (localNodeId , cloneRemoteStoreIndexShardSnapshot );
1368+ } catch (IOException e ) {
1369+ logger .warn ("Failed to get index-metadata from repository data for index [{}]" , repoShardId .index ().getName ());
1370+ failCloneShardAndUpdateClusterState (target , sourceSnapshot , repoShardId );
1371+ }
1372+ }, this ::onFailure ));
1373+ }
1374+ }
1375+
1376+ private void executeClone (String localNodeId , boolean cloneRemoteStoreIndexShardSnapshot ) {
1377+ if (currentlyCloning .add (repoShardId )) {
1378+ if (cloneRemoteStoreIndexShardSnapshot ) {
1379+ repository .cloneRemoteStoreIndexShardSnapshot (
13581380 sourceSnapshot ,
1359- repoShardId .index ()
1381+ target .getSnapshotId (),
1382+ repoShardId ,
1383+ shardStatusBefore .generation (),
1384+ remoteStoreLockManagerFactory ,
1385+ getCloneCompletionListener (localNodeId )
13601386 );
1361- final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy
1362- && indexMetadata .getSettings ().getAsBoolean (IndexMetadata .SETTING_REMOTE_STORE_ENABLED , false );
1363- final SnapshotId targetSnapshot = target .getSnapshotId ();
1364- final ActionListener <String > listener = ActionListener .wrap (
1365- generation -> innerUpdateSnapshotState (
1366- new ShardSnapshotUpdate (
1367- target ,
1368- repoShardId ,
1369- new ShardSnapshotStatus (localNodeId , ShardState .SUCCESS , generation )
1370- ),
1371- ActionListener .runBefore (
1372- ActionListener .wrap (
1373- v -> logger .trace (
1374- "Marked [{}] as successfully cloned from [{}] to [{}]" ,
1375- repoShardId ,
1376- sourceSnapshot ,
1377- targetSnapshot
1378- ),
1379- e -> {
1380- logger .warn ("Cluster state update after successful shard clone [{}] failed" , repoShardId );
1381- failAllListenersOnMasterFailOver (e );
1382- }
1383- ),
1384- () -> currentlyCloning .remove (repoShardId )
1385- )
1386- ),
1387- e -> {
1388- logger .warn ("Exception [{}] while trying to clone shard [{}]" , e , repoShardId );
1389- failCloneShardAndUpdateClusterState (target , sourceSnapshot , repoShardId );
1390- }
1387+ } else {
1388+ repository .cloneShardSnapshot (
1389+ sourceSnapshot ,
1390+ target .getSnapshotId (),
1391+ repoShardId ,
1392+ shardStatusBefore .generation (),
1393+ getCloneCompletionListener (localNodeId )
13911394 );
1392- if (currentlyCloning .add (repoShardId )) {
1393- if (cloneRemoteStoreIndexShardSnapshot ) {
1394- repository .cloneRemoteStoreIndexShardSnapshot (
1395- sourceSnapshot ,
1396- targetSnapshot ,
1395+ }
1396+ }
1397+ }
1398+
1399+ private ActionListener <String > getCloneCompletionListener (String localNodeId ) {
1400+ return ActionListener .wrap (
1401+ generation -> innerUpdateSnapshotState (
1402+ new ShardSnapshotUpdate (target , repoShardId , new ShardSnapshotStatus (localNodeId , ShardState .SUCCESS , generation )),
1403+ ActionListener .runBefore (
1404+ ActionListener .wrap (
1405+ v -> logger .trace (
1406+ "Marked [{}] as successfully cloned from [{}] to [{}]" ,
13971407 repoShardId ,
1398- shardStatusBefore .generation (),
1399- remoteStoreLockManagerFactory ,
1400- listener
1401- );
1402- } else {
1403- repository .cloneShardSnapshot (
14041408 sourceSnapshot ,
1405- targetSnapshot ,
1406- repoShardId ,
1407- shardStatusBefore .generation (),
1408- listener
1409- );
1410- }
1411- }
1412- } catch (IOException e ) {
1413- logger .warn ("Failed to get index-metadata from repository data for index [{}]" , repoShardId .index ().getName ());
1409+ target .getSnapshotId ()
1410+ ),
1411+ e -> {
1412+ logger .warn ("Cluster state update after successful shard clone [{}] failed" , repoShardId );
1413+ failAllListenersOnMasterFailOver (e );
1414+ }
1415+ ),
1416+ () -> currentlyCloning .remove (repoShardId )
1417+ )
1418+ ),
1419+ e -> {
1420+ logger .warn ("Exception [{}] while trying to clone shard [{}]" , e , repoShardId );
14141421 failCloneShardAndUpdateClusterState (target , sourceSnapshot , repoShardId );
14151422 }
1416- }, this :: onFailure ) );
1423+ );
14171424 }
14181425 });
14191426 }
0 commit comments