Skip to content

Commit aeb88b9

Browse files
mch2shiv0408
authored andcommitted
Remove compounding retries within PrimaryShardReplicationSource (opensearch-project#12043)
This change removes retries within PrimaryShardReplicationSource and relies on retries in one place at the start of replication. This is done within SegmentReplicationTargetService's processLatestReceivedCheckpoint after a failure/success occurs. The timeout on these retries is the cause of flaky failures from SegmentReplication's bwc test within IndexingIT, that can occur on node disconnect. The retries will persist for over ~1m to the same primary node that has been relocated/shut down and cause the test to timeout. This change also includes simplifications to the cancellation flow on the target service before the shard is closed. Previously we "request" a cancel that does not remove the target from the ongoing replications collection until a cancellation failure is thrown. The transport calls from PrimaryShardReplicationSource are no longer wrapped in CancellableThreads by the client so a call to "cancel" will not throw. Instead we now immediately remove the target and decref/close it. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 175e8c6 commit aeb88b9

File tree

8 files changed

+133
-123
lines changed

8 files changed

+133
-123
lines changed

qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ public void testIndexing() throws Exception {
262262
* @throws Exception if index creation fail
263263
* @throws UnsupportedOperationException if cluster type is unknown
264264
*/
265-
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7679")
266265
public void testIndexingWithSegRep() throws Exception {
267266
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
268267
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,67 @@ public void testCancellation() throws Exception {
594594
assertDocCounts(docCount, primaryNode);
595595
}
596596

597+
public void testCancellationDuringGetCheckpointInfo() throws Exception {
598+
cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO);
599+
}
600+
601+
public void testCancellationDuringGetSegments() throws Exception {
602+
cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES);
603+
}
604+
605+
private void cancelDuringReplicaAction(String actionToblock) throws Exception {
606+
// this test stubs transport calls specific to node-node replication.
607+
assumeFalse(
608+
"Skipping the test as its not compatible with segment replication with remote store.",
609+
segmentReplicationWithRemoteEnabled()
610+
);
611+
final String primaryNode = internalCluster().startDataOnlyNode();
612+
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
613+
ensureYellow(INDEX_NAME);
614+
615+
final String replicaNode = internalCluster().startDataOnlyNode();
616+
ensureGreen(INDEX_NAME);
617+
final SegmentReplicationTargetService targetService = internalCluster().getInstance(
618+
SegmentReplicationTargetService.class,
619+
replicaNode
620+
);
621+
final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
622+
CountDownLatch startCancellationLatch = new CountDownLatch(1);
623+
CountDownLatch latch = new CountDownLatch(1);
624+
625+
MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(
626+
TransportService.class,
627+
primaryNode
628+
);
629+
primaryTransportService.addRequestHandlingBehavior(actionToblock, (handler, request, channel, task) -> {
630+
logger.info("action {}", actionToblock);
631+
try {
632+
startCancellationLatch.countDown();
633+
latch.await();
634+
} catch (InterruptedException e) {
635+
throw new RuntimeException(e);
636+
}
637+
});
638+
639+
// index a doc and trigger replication
640+
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
641+
642+
// remove the replica and ensure it is cleaned up.
643+
startCancellationLatch.await();
644+
SegmentReplicationTarget target = targetService.get(replicaShard.shardId());
645+
assertAcked(
646+
client().admin()
647+
.indices()
648+
.prepareUpdateSettings(INDEX_NAME)
649+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
650+
);
651+
assertEquals("Replication not closed: " + target.getId(), 0, target.refCount());
652+
assertEquals("Store has a positive refCount", 0, replicaShard.store().refCount());
653+
// stop the replica, this will do additional checks on shutDown to ensure the replica and its store are closed properly
654+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
655+
latch.countDown();
656+
}
657+
597658
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
598659
final String primaryNode = internalCluster().startDataOnlyNode();
599660
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());

server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import org.opensearch.indices.replication.SegmentReplicationState;
2424
import org.opensearch.indices.replication.SegmentReplicationTarget;
2525
import org.opensearch.indices.replication.SegmentReplicationTargetService;
26-
import org.opensearch.indices.replication.common.ReplicationCollection;
27-
import org.opensearch.test.InternalTestCluster;
2826
import org.opensearch.test.OpenSearchIntegTestCase;
2927
import org.opensearch.test.disruption.SlowClusterStateProcessing;
3028

@@ -33,6 +31,8 @@
3331
import java.util.Set;
3432
import java.util.concurrent.TimeUnit;
3533

34+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
35+
3636
/**
3737
* This class runs tests with remote store + segRep while blocking file downloads
3838
*/
@@ -59,22 +59,18 @@ public void testCancelReplicationWhileSyncingSegments() throws Exception {
5959
indexSingleDoc();
6060
refresh(INDEX_NAME);
6161
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
62-
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
63-
assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage());
64-
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
65-
state.getReplicationId()
66-
);
67-
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
68-
// close the target ref here otherwise it will hold a refcount
69-
segmentReplicationTargetReplicationRef.close();
62+
SegmentReplicationTarget segmentReplicationTarget = targetService.get(indexShard.shardId());
7063
assertNotNull(segmentReplicationTarget);
64+
assertEquals(SegmentReplicationState.Stage.GET_FILES, segmentReplicationTarget.state().getStage());
7165
assertTrue(segmentReplicationTarget.refCount() > 0);
72-
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
73-
assertBusy(() -> {
74-
assertTrue(indexShard.routingEntry().primary());
75-
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
76-
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
77-
});
66+
assertAcked(
67+
client().admin()
68+
.indices()
69+
.prepareUpdateSettings(INDEX_NAME)
70+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
71+
);
72+
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
73+
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
7874
unblockNode(REPOSITORY_NAME, replicaNode);
7975
cleanupRepo();
8076
}
@@ -85,7 +81,6 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {
8581

8682
final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
8783
final String replicaNode = getNode(dataNodeNames, false);
88-
final String primaryNode = getNode(dataNodeNames, true);
8984

9085
SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
9186
ensureGreen(INDEX_NAME);
@@ -94,22 +89,18 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {
9489
indexSingleDoc();
9590
refresh(INDEX_NAME);
9691
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
97-
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
98-
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
99-
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
100-
state.getReplicationId()
101-
);
102-
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
103-
// close the target ref here otherwise it will hold a refcount
104-
segmentReplicationTargetReplicationRef.close();
92+
SegmentReplicationTarget segmentReplicationTarget = targetService.get(indexShard.shardId());
10593
assertNotNull(segmentReplicationTarget);
94+
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, segmentReplicationTarget.state().getStage());
10695
assertTrue(segmentReplicationTarget.refCount() > 0);
107-
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
108-
assertBusy(() -> {
109-
assertTrue(indexShard.routingEntry().primary());
110-
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
111-
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
112-
});
96+
assertAcked(
97+
client().admin()
98+
.indices()
99+
.prepareUpdateSettings(INDEX_NAME)
100+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
101+
);
102+
assertNull(targetService.get(indexShard.shardId()));
103+
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
113104
unblockNode(REPOSITORY_NAME, replicaNode);
114105
cleanupRepo();
115106
}

server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,14 @@
88

99
package org.opensearch.indices.replication;
1010

11-
import org.apache.logging.log4j.LogManager;
12-
import org.apache.logging.log4j.Logger;
11+
import org.opensearch.action.ActionListenerResponseHandler;
1312
import org.opensearch.cluster.node.DiscoveryNode;
1413
import org.opensearch.core.action.ActionListener;
15-
import org.opensearch.core.common.io.stream.Writeable;
1614
import org.opensearch.index.shard.IndexShard;
1715
import org.opensearch.index.store.StoreFileMetadata;
1816
import org.opensearch.indices.recovery.RecoverySettings;
19-
import org.opensearch.indices.recovery.RetryableTransportClient;
2017
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
18+
import org.opensearch.threadpool.ThreadPool;
2119
import org.opensearch.transport.TransportRequestOptions;
2220
import org.opensearch.transport.TransportService;
2321

@@ -35,9 +33,7 @@
3533
*/
3634
public class PrimaryShardReplicationSource implements SegmentReplicationSource {
3735

38-
private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class);
39-
40-
private final RetryableTransportClient transportClient;
36+
private final TransportService transportService;
4137

4238
private final DiscoveryNode sourceNode;
4339
private final DiscoveryNode targetNode;
@@ -52,12 +48,7 @@ public PrimaryShardReplicationSource(
5248
DiscoveryNode sourceNode
5349
) {
5450
this.targetAllocationId = targetAllocationId;
55-
this.transportClient = new RetryableTransportClient(
56-
transportService,
57-
sourceNode,
58-
recoverySettings.internalActionRetryTimeout(),
59-
logger
60-
);
51+
this.transportService = transportService;
6152
this.sourceNode = sourceNode;
6253
this.targetNode = targetNode;
6354
this.recoverySettings = recoverySettings;
@@ -69,10 +60,14 @@ public void getCheckpointMetadata(
6960
ReplicationCheckpoint checkpoint,
7061
ActionListener<CheckpointInfoResponse> listener
7162
) {
72-
final Writeable.Reader<CheckpointInfoResponse> reader = CheckpointInfoResponse::new;
73-
final ActionListener<CheckpointInfoResponse> responseListener = ActionListener.map(listener, r -> r);
7463
final CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, targetAllocationId, targetNode, checkpoint);
75-
transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader);
64+
transportService.sendRequest(
65+
sourceNode,
66+
GET_CHECKPOINT_INFO,
67+
request,
68+
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionRetryTimeout()).build(),
69+
new ActionListenerResponseHandler<>(listener, CheckpointInfoResponse::new, ThreadPool.Names.GENERIC)
70+
);
7671
}
7772

7873
@Override
@@ -88,29 +83,24 @@ public void getSegmentFiles(
8883
// MultiFileWriter takes care of progress tracking for downloads in this scenario
8984
// TODO: Move state management and tracking into replication methods and use chunking and data
9085
// copy mechanisms only from MultiFileWriter
91-
final Writeable.Reader<GetSegmentFilesResponse> reader = GetSegmentFilesResponse::new;
92-
final ActionListener<GetSegmentFilesResponse> responseListener = ActionListener.map(listener, r -> r);
9386
final GetSegmentFilesRequest request = new GetSegmentFilesRequest(
9487
replicationId,
9588
targetAllocationId,
9689
targetNode,
9790
filesToFetch,
9891
checkpoint
9992
);
100-
final TransportRequestOptions options = TransportRequestOptions.builder()
101-
.withTimeout(recoverySettings.internalActionLongTimeout())
102-
.build();
103-
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader);
93+
transportService.sendRequest(
94+
sourceNode,
95+
GET_SEGMENT_FILES,
96+
request,
97+
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
98+
new ActionListenerResponseHandler<>(listener, GetSegmentFilesResponse::new, ThreadPool.Names.GENERIC)
99+
);
104100
}
105101

106102
@Override
107103
public String getDescription() {
108104
return sourceNode.getName();
109105
}
110-
111-
@Override
112-
public void cancel() {
113-
transportClient.cancel();
114-
}
115-
116106
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ protected void closeInternal() {
8383
}
8484
}
8585

86+
@Override
87+
protected void onCancel(String reason) {
88+
try {
89+
notifyListener(new ReplicationFailedException(reason), false);
90+
} finally {
91+
source.cancel();
92+
cancellableThreads.cancel(reason);
93+
}
94+
}
95+
8696
@Override
8797
protected String getPrefix() {
8898
return REPLICATION_PREFIX + UUIDs.randomBase64UUID() + ".";
@@ -320,16 +330,4 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse)
320330
}
321331
}
322332
}
323-
324-
/**
325-
* Trigger a cancellation, this method will not close the target a subsequent call to #fail is required from target service.
326-
*/
327-
@Override
328-
public void cancel(String reason) {
329-
if (finished.get() == false) {
330-
logger.trace(new ParameterizedMessage("Cancelling replication for target {}", description()));
331-
cancellableThreads.cancel(reason);
332-
source.cancel();
333-
}
334-
}
335333
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,6 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent
8484
private final ClusterService clusterService;
8585
private final TransportService transportService;
8686

87-
public ReplicationRef<SegmentReplicationTarget> get(long replicationId) {
88-
return onGoingReplications.get(replicationId);
89-
}
90-
9187
/**
9288
* The internal actions
9389
*
@@ -158,6 +154,7 @@ protected void doStart() {
158154
@Override
159155
protected void doStop() {
160156
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
157+
assert onGoingReplications.size() == 0 : "Replication collection should be empty on shutdown";
161158
clusterService.removeListener(this);
162159
}
163160
}
@@ -201,7 +198,7 @@ public void clusterChanged(ClusterChangedEvent event) {
201198
@Override
202199
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
203200
if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) {
204-
onGoingReplications.requestCancel(indexShard.shardId(), "Shard closing");
201+
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing");
205202
latestReceivedCheckpoint.remove(shardId);
206203
}
207204
}
@@ -223,7 +220,7 @@ public void afterIndexShardStarted(IndexShard indexShard) {
223220
@Override
224221
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
225222
if (oldRouting != null && indexShard.indexSettings().isSegRepEnabled() && oldRouting.primary() == false && newRouting.primary()) {
226-
onGoingReplications.requestCancel(indexShard.shardId(), "Shard has been promoted to primary");
223+
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary");
227224
latestReceivedCheckpoint.remove(indexShard.shardId());
228225
}
229226
}
@@ -255,6 +252,14 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) {
255252
.orElseGet(() -> getlatestCompletedEventSegmentReplicationState(shardId));
256253
}
257254

255+
public ReplicationRef<SegmentReplicationTarget> get(long replicationId) {
256+
return onGoingReplications.get(replicationId);
257+
}
258+
259+
public SegmentReplicationTarget get(ShardId shardId) {
260+
return onGoingReplications.getOngoingReplicationTarget(shardId);
261+
}
262+
258263
/**
259264
* Invoked when a new checkpoint is received from a primary shard.
260265
* It checks if a new checkpoint should be processed or not and starts replication if needed.
@@ -454,7 +459,13 @@ protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Threa
454459
latestPublishedCheckpoint
455460
)
456461
);
457-
Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard);
462+
Runnable runnable = () -> {
463+
// if we retry ensure the shard is not in the process of being closed.
464+
// it will be removed from indexService's collection before the shard is actually marked as closed.
465+
if (indicesService.getShardOrNull(replicaShard.shardId()) != null) {
466+
onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard);
467+
}
468+
};
458469
// Checks if we are using same thread and forks if necessary.
459470
if (thread == Thread.currentThread()) {
460471
threadPool.generic().execute(runnable);
@@ -548,9 +559,6 @@ public ReplicationRunner(long replicationId) {
548559

549560
@Override
550561
public void onFailure(Exception e) {
551-
try (final ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.get(replicationId)) {
552-
logger.error(() -> new ParameterizedMessage("Error during segment replication, {}", ref.get().description()), e);
553-
}
554562
onGoingReplications.fail(replicationId, new ReplicationFailedException("Unexpected Error during replication", e), false);
555563
}
556564

0 commit comments

Comments
 (0)