Skip to content

Commit 988284a

Browse files
authored
adding check to avoid parallel replication events taking place (opensearch-project#5831)
Signed-off-by: Poojita Raj <poojiraj@amazon.com> Signed-off-by: Poojita Raj <poojiraj@amazon.com>
1 parent 095c222 commit 988284a

File tree

1 file changed

+66
-58
lines changed

1 file changed

+66
-58
lines changed

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 66 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.common.util.concurrent.ConcurrentCollections;
2121
import org.opensearch.index.shard.IndexEventListener;
2222
import org.opensearch.index.shard.IndexShard;
23+
import org.opensearch.index.shard.IndexShardState;
2324
import org.opensearch.index.shard.ShardId;
2425
import org.opensearch.indices.IndicesService;
2526
import org.opensearch.indices.recovery.FileChunkRequest;
@@ -160,75 +161,82 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
160161
*/
161162
public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) {
162163
logger.trace(() -> new ParameterizedMessage("Replica received new replication checkpoint from primary [{}]", receivedCheckpoint));
163-
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
164-
if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) {
165-
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
166-
latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint);
167-
}
168-
} else {
169-
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
170-
}
171-
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
172-
if (ongoingReplicationTarget != null) {
173-
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
174-
logger.trace(
175-
"Cancelling ongoing replication from old primary with primary term {}",
176-
ongoingReplicationTarget.getCheckpoint().getPrimaryTerm()
177-
);
178-
onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary");
164+
// Checks if replica shard is in the correct STARTED state to process checkpoints (avoids parallel replication events taking place)
165+
if (replicaShard.state().equals(IndexShardState.STARTED) == true) {
166+
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
167+
if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) {
168+
if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) {
169+
latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint);
170+
}
179171
} else {
180-
logger.trace(
181-
() -> new ParameterizedMessage(
182-
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
183-
replicaShard.getLatestReplicationCheckpoint()
184-
)
185-
);
186-
return;
172+
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
187173
}
188-
}
189-
final Thread thread = Thread.currentThread();
190-
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
191-
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
192-
@Override
193-
public void onReplicationDone(SegmentReplicationState state) {
174+
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
175+
if (ongoingReplicationTarget != null) {
176+
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
177+
logger.trace(
178+
"Cancelling ongoing replication from old primary with primary term {}",
179+
ongoingReplicationTarget.getCheckpoint().getPrimaryTerm()
180+
);
181+
onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary");
182+
} else {
194183
logger.trace(
195184
() -> new ParameterizedMessage(
196-
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
197-
replicaShard.shardId().getId(),
198-
state.getReplicationId(),
199-
state.getTimingData()
185+
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
186+
replicaShard.getLatestReplicationCheckpoint()
200187
)
201188
);
202-
// if we received a checkpoint during the copy event that is ahead of this
203-
// try and process it.
204-
if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
205-
Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard);
206-
// Checks if we are using same thread and forks if necessary.
207-
if (thread == Thread.currentThread()) {
208-
threadPool.generic().execute(runnable);
209-
} else {
210-
runnable.run();
189+
return;
190+
}
191+
}
192+
final Thread thread = Thread.currentThread();
193+
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
194+
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
195+
@Override
196+
public void onReplicationDone(SegmentReplicationState state) {
197+
logger.trace(
198+
() -> new ParameterizedMessage(
199+
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
200+
replicaShard.shardId().getId(),
201+
state.getReplicationId(),
202+
state.getTimingData()
203+
)
204+
);
205+
// if we received a checkpoint during the copy event that is ahead of this
206+
// try and process it.
207+
if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
208+
Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard);
209+
// Checks if we are using same thread and forks if necessary.
210+
if (thread == Thread.currentThread()) {
211+
threadPool.generic().execute(runnable);
212+
} else {
213+
runnable.run();
214+
}
211215
}
212216
}
213-
}
214217

215-
@Override
216-
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
217-
logger.trace(
218-
() -> new ParameterizedMessage(
219-
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
220-
replicaShard.shardId().getId(),
221-
state.getReplicationId(),
222-
state.getTimingData()
223-
)
224-
);
225-
if (sendShardFailure == true) {
226-
logger.error("replication failure", e);
227-
replicaShard.failShard("replication failure", e);
218+
@Override
219+
public void onReplicationFailure(
220+
SegmentReplicationState state,
221+
ReplicationFailedException e,
222+
boolean sendShardFailure
223+
) {
224+
logger.trace(
225+
() -> new ParameterizedMessage(
226+
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
227+
replicaShard.shardId().getId(),
228+
state.getReplicationId(),
229+
state.getTimingData()
230+
)
231+
);
232+
if (sendShardFailure == true) {
233+
logger.error("replication failure", e);
234+
replicaShard.failShard("replication failure", e);
235+
}
228236
}
229-
}
230-
});
237+
});
231238

239+
}
232240
}
233241
}
234242

0 commit comments

Comments
 (0)