47
47
import java .util .Map ;
48
48
import java .util .Objects ;
49
49
import java .util .concurrent .CountDownLatch ;
50
+ import java .util .concurrent .TimeUnit ;
50
51
import java .util .concurrent .atomic .AtomicLong ;
51
52
52
53
import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_REPLICATION_TYPE ;
@@ -469,6 +470,25 @@ public void testRefreshPersistentFailure() throws Exception {
469
470
assertFalse ("remote store should not in sync" , tuple .v1 ().isRemoteSegmentStoreInSync ());
470
471
}
471
472
473
+ public void testRefreshPersistentFailureAndIndexShardClosed () throws Exception {
474
+ int succeedOnAttempt = 3 ;
475
+ int closeShardOnAttempt = 1 ;
476
+ CountDownLatch refreshCountLatch = new CountDownLatch (1 );
477
+ CountDownLatch successLatch = new CountDownLatch (10 );
478
+ Tuple <RemoteStoreRefreshListener , RemoteStoreStatsTrackerFactory > tuple = mockIndexShardWithRetryAndScheduleRefresh (
479
+ succeedOnAttempt ,
480
+ refreshCountLatch ,
481
+ successLatch ,
482
+ true ,
483
+ closeShardOnAttempt
484
+ );
485
+ // Giving 10ms for some iterations of remote refresh upload
486
+ Thread .sleep (TimeUnit .SECONDS .toMillis (2 ));
487
+ RemoteStoreRefreshListener listener = tuple .v1 ();
488
+ assertFalse ("remote store should not in sync" , listener .isRemoteSegmentStoreInSync ());
489
+ assertFalse (listener .getRetryScheduledStatus ());
490
+ }
491
+
472
492
private void assertNoLagAndTotalUploadsFailed (RemoteSegmentTransferTracker segmentTracker , long totalUploadsFailed ) throws Exception {
473
493
assertBusy (() -> {
474
494
assertEquals (0 , segmentTracker .getBytesLag ());
@@ -547,6 +567,49 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
547
567
return mockIndexShardWithRetryAndScheduleRefresh (succeedOnAttempt , refreshCountLatch , successLatch , 1 , noOpLatch );
548
568
}
549
569
570
+ private Tuple <RemoteStoreRefreshListener , RemoteStoreStatsTrackerFactory > mockIndexShardWithRetryAndScheduleRefresh (
571
+ int totalAttempt ,
572
+ CountDownLatch refreshCountLatch ,
573
+ CountDownLatch successLatch ,
574
+ int checkpointPublishSucceedOnAttempt ,
575
+ CountDownLatch reachedCheckpointPublishLatch ,
576
+ boolean mockPrimaryTerm ,
577
+ boolean testUploadTimeout
578
+ ) throws IOException {
579
+ return mockIndexShardWithRetryAndScheduleRefresh (
580
+ totalAttempt ,
581
+ refreshCountLatch ,
582
+ successLatch ,
583
+ checkpointPublishSucceedOnAttempt ,
584
+ reachedCheckpointPublishLatch ,
585
+ mockPrimaryTerm ,
586
+ testUploadTimeout ,
587
+ false ,
588
+ 0
589
+ );
590
+ }
591
+
592
+ private Tuple <RemoteStoreRefreshListener , RemoteStoreStatsTrackerFactory > mockIndexShardWithRetryAndScheduleRefresh (
593
+ int succeedOnAttempt ,
594
+ CountDownLatch refreshCountLatch ,
595
+ CountDownLatch successLatch ,
596
+ boolean closedShard ,
597
+ int closeShardAfterAttempt
598
+ ) throws IOException {
599
+ CountDownLatch noOpLatch = new CountDownLatch (0 );
600
+ return mockIndexShardWithRetryAndScheduleRefresh (
601
+ succeedOnAttempt ,
602
+ refreshCountLatch ,
603
+ successLatch ,
604
+ 1 ,
605
+ noOpLatch ,
606
+ true ,
607
+ false ,
608
+ closedShard ,
609
+ closeShardAfterAttempt
610
+ );
611
+ }
612
+
550
613
private Tuple <RemoteStoreRefreshListener , RemoteStoreStatsTrackerFactory > mockIndexShardWithRetryAndScheduleRefresh (
551
614
int succeedOnAttempt ,
552
615
CountDownLatch refreshCountLatch ,
@@ -561,7 +624,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
561
624
succeedCheckpointPublishOnAttempt ,
562
625
reachedCheckpointPublishLatch ,
563
626
true ,
564
- false
627
+ false ,
628
+ false ,
629
+ 0
565
630
);
566
631
}
567
632
@@ -572,7 +637,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
572
637
int succeedCheckpointPublishOnAttempt ,
573
638
CountDownLatch reachedCheckpointPublishLatch ,
574
639
boolean mockPrimaryTerm ,
575
- boolean testUploadTimeout
640
+ boolean testUploadTimeout ,
641
+ boolean closeShard ,
642
+ int closeShardAfterAttempt
576
643
) throws IOException {
577
644
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
578
645
indexShard = newStartedShard (
@@ -600,7 +667,6 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
600
667
IndexShard shard = mock (IndexShard .class );
601
668
Store store = mock (Store .class );
602
669
when (shard .store ()).thenReturn (store );
603
- when (shard .state ()).thenReturn (IndexShardState .STARTED );
604
670
when (store .directory ()).thenReturn (indexShard .store ().directory ());
605
671
606
672
// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
@@ -662,6 +728,14 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
662
728
return indexShard .getLatestReplicationCheckpoint ();
663
729
})).when (shard ).computeReplicationCheckpoint (any ());
664
730
731
+ doAnswer ((invocationOnMock -> {
732
+ if (closeShard && counter .get () == closeShardAfterAttempt ) {
733
+ logger .info ("Closing shard..." );
734
+ return IndexShardState .CLOSED ;
735
+ }
736
+ return IndexShardState .STARTED ;
737
+ })).when (shard ).state ();
738
+
665
739
doAnswer (invocation -> {
666
740
if (Objects .nonNull (successLatch )) {
667
741
successLatch .countDown ();
0 commit comments