Skip to content

Commit 4fd7ee8

Browse files
[Segment Replication] Remove primary targets from replication tracker (#11011) (#11012)
(cherry picked from commit 9c65350) Signed-off-by: Suraj Singh <surajrider@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent cded50f commit 4fd7ee8

File tree

2 files changed

+92
-0
lines changed

2 files changed

+92
-0
lines changed

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.util.List;
7272
import java.util.Map;
7373
import java.util.Objects;
74+
import java.util.Optional;
7475
import java.util.OptionalLong;
7576
import java.util.Set;
7677
import java.util.concurrent.atomic.AtomicLong;
@@ -1229,6 +1230,14 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
12291230
return this.latestReplicationCheckpoint;
12301231
}
12311232

1233+
private boolean isPrimaryRelocation(String allocationId) {
1234+
Optional<ShardRouting> shardRouting = routingTable.shards()
1235+
.stream()
1236+
.filter(routing -> routing.allocationId().getId().equals(allocationId))
1237+
.findAny();
1238+
return shardRouting.isPresent() && shardRouting.get().primary();
1239+
}
1240+
12321241
private void createReplicationLagTimers() {
12331242
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
12341243
final String allocationId = entry.getKey();
@@ -1238,6 +1247,7 @@ private void createReplicationLagTimers() {
12381247
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
12391248
if (cps.inSync
12401249
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
1250+
&& isPrimaryRelocation(allocationId) == false
12411251
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
12421252
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
12431253
logger.trace(
@@ -1269,6 +1279,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
12691279
final CheckpointState cps = e.getValue();
12701280
if (cps.inSync
12711281
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
1282+
&& isPrimaryRelocation(e.getKey()) == false
12721283
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
12731284
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
12741285
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
@@ -1293,6 +1304,7 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
12931304
entry -> entry.getKey().equals(this.shardAllocationId) == false
12941305
&& entry.getValue().inSync
12951306
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
1307+
&& isPrimaryRelocation(entry.getKey()) == false
12961308
)
12971309
.map(entry -> buildShardStats(entry.getKey(), entry.getValue()))
12981310
.collect(Collectors.toUnmodifiableSet());

server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1907,6 +1907,86 @@ public void testSegmentReplicationCheckpointTracking() {
19071907
}
19081908
}
19091909

1910+
public void testSegmentReplicationCheckpointForRelocatingPrimary() {
1911+
Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
1912+
final long initialClusterStateVersion = randomNonNegativeLong();
1913+
final int numberOfActiveAllocationsIds = randomIntBetween(2, 2);
1914+
final int numberOfInitializingIds = randomIntBetween(2, 2);
1915+
final Tuple<Set<AllocationId>, Set<AllocationId>> activeAndInitializingAllocationIds = randomActiveAndInitializingAllocationIds(
1916+
numberOfActiveAllocationsIds,
1917+
numberOfInitializingIds
1918+
);
1919+
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
1920+
final Set<AllocationId> initializingIds = activeAndInitializingAllocationIds.v2();
1921+
1922+
AllocationId targetAllocationId = initializingIds.iterator().next();
1923+
AllocationId primaryId = activeAllocationIds.iterator().next();
1924+
String relocatingToNodeId = nodeIdFromAllocationId(targetAllocationId);
1925+
1926+
logger.info("--> activeAllocationIds {} Primary {}", activeAllocationIds, primaryId.getId());
1927+
logger.info("--> initializingIds {} Target {}", initializingIds, targetAllocationId);
1928+
1929+
final ShardId shardId = new ShardId("test", "_na_", 0);
1930+
final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId);
1931+
for (final AllocationId initializingId : initializingIds) {
1932+
boolean primaryRelocationTarget = initializingId.equals(targetAllocationId);
1933+
builder.addShard(
1934+
TestShardRouting.newShardRouting(
1935+
shardId,
1936+
nodeIdFromAllocationId(initializingId),
1937+
null,
1938+
primaryRelocationTarget,
1939+
ShardRoutingState.INITIALIZING,
1940+
initializingId
1941+
)
1942+
);
1943+
}
1944+
builder.addShard(
1945+
TestShardRouting.newShardRouting(
1946+
shardId,
1947+
nodeIdFromAllocationId(primaryId),
1948+
relocatingToNodeId,
1949+
true,
1950+
ShardRoutingState.STARTED,
1951+
primaryId
1952+
)
1953+
);
1954+
IndexShardRoutingTable routingTable = builder.build();
1955+
final ReplicationTracker tracker = newTracker(primaryId, settings);
1956+
tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
1957+
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
1958+
assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds)));
1959+
assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable));
1960+
assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
1961+
initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
1962+
1963+
final StoreFileMetadata segment_1 = new StoreFileMetadata("segment_1", 5L, "abcd", Version.LATEST);
1964+
final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(
1965+
tracker.shardId(),
1966+
0L,
1967+
1,
1968+
1,
1969+
5L,
1970+
Codec.getDefault().getName(),
1971+
Map.of("segment_1", segment_1)
1972+
);
1973+
tracker.setLatestReplicationCheckpoint(initialCheckpoint);
1974+
tracker.startReplicationLagTimers(initialCheckpoint);
1975+
1976+
final Set<String> expectedIds = initializingIds.stream()
1977+
.filter(id -> id.equals(targetAllocationId))
1978+
.map(AllocationId::getId)
1979+
.collect(Collectors.toSet());
1980+
1981+
Set<SegmentReplicationShardStats> groupStats = tracker.getSegmentReplicationStats();
1982+
assertEquals(expectedIds.size(), groupStats.size());
1983+
for (SegmentReplicationShardStats shardStat : groupStats) {
1984+
assertEquals(1, shardStat.getCheckpointsBehindCount());
1985+
assertEquals(5L, shardStat.getBytesBehindCount());
1986+
assertTrue(shardStat.getCurrentReplicationLagMillis() >= shardStat.getCurrentReplicationTimeMillis());
1987+
}
1988+
}
1989+
19101990
public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() {
19111991
Settings settings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
19121992
final long initialClusterStateVersion = randomNonNegativeLong();

0 commit comments

Comments
 (0)