Skip to content

Commit adca083

Browse files
shourya035parv0201
authored andcommitted
[Remote Store] Add checks to skip remote uploads after shard is closed (opensearch-project#13998)
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
1 parent 69107ed commit adca083

File tree

2 files changed

+93
-3
lines changed

2 files changed

+93
-3
lines changed

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,13 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
170170
* @return true if sync is needed
171171
*/
172172
private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
173+
// Ignore syncing segments if the underlying shard is closed
174+
// This also makes sure that retries are not scheduled for shards
175+
// with failed syncSegments invocation after they are closed
176+
if (shardClosed()) {
177+
logger.info("Shard is already closed. Not attempting sync to remote store");
178+
return false;
179+
}
173180
boolean shouldSync = didRefresh // If the readers change, didRefresh is always true.
174181
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader
175182
// reference, but it is important to upload the zero state segments so that the restore does not break.
@@ -607,6 +614,15 @@ public void onFailure(String file) {
607614
};
608615
}
609616

617+
/**
618+
* Checks if the underlying IndexShard instance is closed
619+
*
620+
* @return true if it is closed, false otherwise
621+
*/
622+
private boolean shardClosed() {
623+
return indexShard.state() == IndexShardState.CLOSED;
624+
}
625+
610626
@Override
611627
protected Logger getLogger() {
612628
return logger;

server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Map;
4949
import java.util.Objects;
5050
import java.util.concurrent.CountDownLatch;
51+
import java.util.concurrent.TimeUnit;
5152
import java.util.concurrent.atomic.AtomicLong;
5253

5354
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
@@ -470,6 +471,25 @@ public void testRefreshPersistentFailure() throws Exception {
470471
assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync());
471472
}
472473

474+
public void testRefreshPersistentFailureAndIndexShardClosed() throws Exception {
475+
int succeedOnAttempt = 3;
476+
int closeShardOnAttempt = 1;
477+
CountDownLatch refreshCountLatch = new CountDownLatch(1);
478+
CountDownLatch successLatch = new CountDownLatch(10);
479+
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
480+
succeedOnAttempt,
481+
refreshCountLatch,
482+
successLatch,
483+
true,
484+
closeShardOnAttempt
485+
);
486+
// Giving 10ms for some iterations of remote refresh upload
487+
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
488+
RemoteStoreRefreshListener listener = tuple.v1();
489+
assertFalse("remote store should not in sync", listener.isRemoteSegmentStoreInSync());
490+
assertFalse(listener.getRetryScheduledStatus());
491+
}
492+
473493
private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception {
474494
assertBusy(() -> {
475495
assertEquals(0, segmentTracker.getBytesLag());
@@ -548,6 +568,49 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
548568
return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch);
549569
}
550570

571+
private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
572+
int totalAttempt,
573+
CountDownLatch refreshCountLatch,
574+
CountDownLatch successLatch,
575+
int checkpointPublishSucceedOnAttempt,
576+
CountDownLatch reachedCheckpointPublishLatch,
577+
boolean mockPrimaryTerm,
578+
boolean testUploadTimeout
579+
) throws IOException {
580+
return mockIndexShardWithRetryAndScheduleRefresh(
581+
totalAttempt,
582+
refreshCountLatch,
583+
successLatch,
584+
checkpointPublishSucceedOnAttempt,
585+
reachedCheckpointPublishLatch,
586+
mockPrimaryTerm,
587+
testUploadTimeout,
588+
false,
589+
0
590+
);
591+
}
592+
593+
private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
594+
int succeedOnAttempt,
595+
CountDownLatch refreshCountLatch,
596+
CountDownLatch successLatch,
597+
boolean closedShard,
598+
int closeShardAfterAttempt
599+
) throws IOException {
600+
CountDownLatch noOpLatch = new CountDownLatch(0);
601+
return mockIndexShardWithRetryAndScheduleRefresh(
602+
succeedOnAttempt,
603+
refreshCountLatch,
604+
successLatch,
605+
1,
606+
noOpLatch,
607+
true,
608+
false,
609+
closedShard,
610+
closeShardAfterAttempt
611+
);
612+
}
613+
551614
private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
552615
int succeedOnAttempt,
553616
CountDownLatch refreshCountLatch,
@@ -562,7 +625,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
562625
succeedCheckpointPublishOnAttempt,
563626
reachedCheckpointPublishLatch,
564627
true,
565-
false
628+
false,
629+
false,
630+
0
566631
);
567632
}
568633

@@ -573,7 +638,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
573638
int succeedCheckpointPublishOnAttempt,
574639
CountDownLatch reachedCheckpointPublishLatch,
575640
boolean mockPrimaryTerm,
576-
boolean testUploadTimeout
641+
boolean testUploadTimeout,
642+
boolean closeShard,
643+
int closeShardAfterAttempt
577644
) throws IOException {
578645
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
579646
indexShard = newStartedShard(
@@ -601,7 +668,6 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
601668
IndexShard shard = mock(IndexShard.class);
602669
Store store = mock(Store.class);
603670
when(shard.store()).thenReturn(store);
604-
when(shard.state()).thenReturn(IndexShardState.STARTED);
605671
when(store.directory()).thenReturn(indexShard.store().directory());
606672

607673
// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
@@ -663,6 +729,14 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
663729
return indexShard.getLatestReplicationCheckpoint();
664730
})).when(shard).computeReplicationCheckpoint(any());
665731

732+
doAnswer((invocationOnMock -> {
733+
if (closeShard && counter.get() == closeShardAfterAttempt) {
734+
logger.info("Closing shard...");
735+
return IndexShardState.CLOSED;
736+
}
737+
return IndexShardState.STARTED;
738+
})).when(shard).state();
739+
666740
doAnswer(invocation -> {
667741
if (Objects.nonNull(successLatch)) {
668742
successLatch.countDown();

0 commit comments

Comments
 (0)