Skip to content

Commit 7dad063

Browse files
authored
[Segment Replication] Fix OngoingSegmentReplications to key by allocation ID instead of DiscoveryNode. (opensearch-project#4182)
* Fix OngoingSegmentReplications to key by allocation ID instead of DiscoveryNode. This change fixes segrep to work with multiple shards per node by keying ongoing replications on allocation ID. This also updates cancel methods to ensure state is properly cleared on shard cancel. Signed-off-by: Marc Handalian <handalm@amazon.com> * Clean up cancel methods. Signed-off-by: Marc Handalian <handalm@amazon.com> Signed-off-by: Marc Handalian <handalm@amazon.com>
1 parent 1c3f034 commit 7dad063

File tree

5 files changed

+166
-40
lines changed

5 files changed

+166
-40
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,54 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
111111
}
112112
}
113113

114+
public void testMultipleShards() throws Exception {
115+
Settings indexSettings = Settings.builder()
116+
.put(super.indexSettings())
117+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
118+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
119+
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
120+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
121+
.build();
122+
final String nodeA = internalCluster().startNode();
123+
final String nodeB = internalCluster().startNode();
124+
createIndex(INDEX_NAME, indexSettings);
125+
ensureGreen(INDEX_NAME);
126+
127+
final int initialDocCount = scaledRandomIntBetween(1, 200);
128+
try (
129+
BackgroundIndexer indexer = new BackgroundIndexer(
130+
INDEX_NAME,
131+
"_doc",
132+
client(),
133+
-1,
134+
RandomizedTest.scaledRandomIntBetween(2, 5),
135+
false,
136+
random()
137+
)
138+
) {
139+
indexer.start(initialDocCount);
140+
waitForDocs(initialDocCount, indexer);
141+
refresh(INDEX_NAME);
142+
waitForReplicaUpdate();
143+
144+
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
145+
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
146+
147+
final int additionalDocCount = scaledRandomIntBetween(0, 200);
148+
final int expectedHitCount = initialDocCount + additionalDocCount;
149+
indexer.start(additionalDocCount);
150+
waitForDocs(expectedHitCount, indexer);
151+
152+
flushAndRefresh(INDEX_NAME);
153+
waitForReplicaUpdate();
154+
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
155+
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
156+
157+
ensureGreen(INDEX_NAME);
158+
assertSegmentStats(REPLICA_COUNT);
159+
}
160+
}
161+
114162
public void testReplicationAfterForceMerge() throws Exception {
115163
final String nodeA = internalCluster().startNode();
116164
final String nodeB = internalCluster().startNode();
@@ -262,15 +310,17 @@ private void waitForReplicaUpdate() throws Exception {
262310
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
263311
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
264312
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);
265-
313+
// if we don't have any segments yet, proceed.
266314
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
267-
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
268-
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
269-
for (ShardSegments shardSegments : replicaShardSegments) {
270-
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
271-
.stream()
272-
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
273-
assertTrue(isReplicaCaughtUpToPrimary);
315+
if (primaryShardSegments.getSegments().isEmpty() == false) {
316+
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
317+
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
318+
for (ShardSegments shardSegments : replicaShardSegments) {
319+
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
320+
.stream()
321+
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
322+
assertTrue(isReplicaCaughtUpToPrimary);
323+
}
274324
}
275325
}
276326
});

server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import java.io.IOException;
2525
import java.util.Collections;
2626
import java.util.HashMap;
27+
import java.util.List;
2728
import java.util.Map;
29+
import java.util.function.Predicate;
30+
import java.util.stream.Collectors;
2831

2932
/**
3033
* Manages references to ongoing segrep events on a node.
@@ -38,7 +41,7 @@ class OngoingSegmentReplications {
3841
private final RecoverySettings recoverySettings;
3942
private final IndicesService indicesService;
4043
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
41-
private final Map<DiscoveryNode, SegmentReplicationSourceHandler> nodesToHandlers;
44+
private final Map<String, SegmentReplicationSourceHandler> allocationIdToHandlers;
4245

4346
/**
4447
* Constructor.
@@ -50,7 +53,7 @@ class OngoingSegmentReplications {
5053
this.indicesService = indicesService;
5154
this.recoverySettings = recoverySettings;
5255
this.copyStateMap = Collections.synchronizedMap(new HashMap<>());
53-
this.nodesToHandlers = ConcurrentCollections.newConcurrentMap();
56+
this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap();
5457
}
5558

5659
/**
@@ -96,8 +99,7 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro
9699
* @param listener {@link ActionListener} that resolves when sending files is complete.
97100
*/
98101
void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
99-
final DiscoveryNode node = request.getTargetNode();
100-
final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node);
102+
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.get(request.getTargetAllocationId());
101103
if (handler != null) {
102104
if (handler.isReplicating()) {
103105
throw new OpenSearchException(
@@ -108,7 +110,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
108110
}
109111
// update the given listener to release the CopyState before it resolves.
110112
final ActionListener<GetSegmentFilesResponse> wrappedListener = ActionListener.runBefore(listener, () -> {
111-
final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node);
113+
final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId());
112114
if (sourceHandler != null) {
113115
removeCopyState(sourceHandler.getCopyState());
114116
}
@@ -123,19 +125,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
123125
}
124126
}
125127

126-
/**
127-
* Cancel any ongoing replications for a given {@link DiscoveryNode}
128-
*
129-
* @param node {@link DiscoveryNode} node for which to cancel replication events.
130-
*/
131-
void cancelReplication(DiscoveryNode node) {
132-
final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node);
133-
if (handler != null) {
134-
handler.cancel("Cancel on node left");
135-
removeCopyState(handler.getCopyState());
136-
}
137-
}
138-
139128
/**
140129
* Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
141130
* nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
@@ -149,9 +138,9 @@ void cancelReplication(DiscoveryNode node) {
149138
*/
150139
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
151140
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
152-
if (nodesToHandlers.putIfAbsent(
153-
request.getTargetNode(),
154-
createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter)
141+
if (allocationIdToHandlers.putIfAbsent(
142+
request.getTargetAllocationId(),
143+
createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter)
155144
) != null) {
156145
throw new OpenSearchException(
157146
"Shard copy {} on node {} already replicating",
@@ -163,18 +152,23 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f
163152
}
164153

165154
/**
166-
* Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down.
155+
* Cancel all Replication events for the given shard, intended to be called when a primary is shutting down.
167156
*
168157
* @param shard {@link IndexShard}
169158
* @param reason {@link String} - Reason for the cancel
170159
*/
171160
synchronized void cancel(IndexShard shard, String reason) {
172-
for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) {
173-
if (entry.getCopyState().getShard().equals(shard)) {
174-
entry.cancel(reason);
175-
}
176-
}
177-
copyStateMap.clear();
161+
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
162+
}
163+
164+
/**
165+
* Cancel any ongoing replications for a given {@link DiscoveryNode}
166+
*
167+
* @param node {@link DiscoveryNode} node for which to cancel replication events.
168+
*/
169+
void cancelReplication(DiscoveryNode node) {
170+
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");
171+
178172
}
179173

180174
/**
@@ -186,19 +180,25 @@ boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
186180
}
187181

188182
int size() {
189-
return nodesToHandlers.size();
183+
return allocationIdToHandlers.size();
190184
}
191185

192186
int cachedCopyStateSize() {
193187
return copyStateMap.size();
194188
}
195189

196-
private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) {
190+
private SegmentReplicationSourceHandler createTargetHandler(
191+
DiscoveryNode node,
192+
CopyState copyState,
193+
String allocationId,
194+
FileChunkWriter fileChunkWriter
195+
) {
197196
return new SegmentReplicationSourceHandler(
198197
node,
199198
fileChunkWriter,
200199
copyState.getShard().getThreadPool(),
201200
copyState,
201+
allocationId,
202202
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
203203
recoverySettings.getMaxConcurrentFileChunks()
204204
);
@@ -231,4 +231,23 @@ private synchronized void removeCopyState(CopyState copyState) {
231231
copyStateMap.remove(copyState.getRequestedReplicationCheckpoint());
232232
}
233233
}
234+
235+
/**
236+
* Remove handlers from allocationIdToHandlers map based on a filter predicate.
237+
* This will also decref the handler's CopyState reference.
238+
*/
239+
private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> predicate, String reason) {
240+
final List<String> allocationIds = allocationIdToHandlers.values()
241+
.stream()
242+
.filter(predicate)
243+
.map(SegmentReplicationSourceHandler::getAllocationId)
244+
.collect(Collectors.toList());
245+
for (String allocationId : allocationIds) {
246+
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
247+
if (handler != null) {
248+
handler.cancel(reason);
249+
removeCopyState(handler.getCopyState());
250+
}
251+
}
252+
}
234253
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class SegmentReplicationSourceHandler {
5454
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
5555
private final Logger logger;
5656
private final AtomicBoolean isReplicating = new AtomicBoolean();
57+
private final DiscoveryNode targetNode;
58+
private final String allocationId;
5759

5860
/**
5961
* Constructor.
@@ -70,9 +72,11 @@ class SegmentReplicationSourceHandler {
7072
FileChunkWriter writer,
7173
ThreadPool threadPool,
7274
CopyState copyState,
75+
String allocationId,
7376
int fileChunkSizeInBytes,
7477
int maxConcurrentFileChunks
7578
) {
79+
this.targetNode = targetNode;
7680
this.shard = copyState.getShard();
7781
this.logger = Loggers.getLogger(
7882
SegmentReplicationSourceHandler.class,
@@ -89,6 +93,7 @@ class SegmentReplicationSourceHandler {
8993
fileChunkSizeInBytes,
9094
maxConcurrentFileChunks
9195
);
96+
this.allocationId = allocationId;
9297
this.copyState = copyState;
9398
}
9499

@@ -118,7 +123,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
118123
logger.debug(
119124
"delaying replication of {} as it is not listed as assigned to target node {}",
120125
shard.shardId(),
121-
request.getTargetNode()
126+
targetNode
122127
);
123128
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
124129
}
@@ -175,4 +180,12 @@ CopyState getCopyState() {
175180
public boolean isReplicating() {
176181
return isReplicating.get();
177182
}
183+
184+
public DiscoveryNode getTargetNode() {
185+
return targetNode;
186+
}
187+
188+
public String getAllocationId() {
189+
return allocationId;
190+
}
178191
}

server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ public void testCancelReplication() throws IOException {
155155
}
156156

157157
public void testMultipleReplicasUseSameCheckpoint() throws IOException {
158+
IndexShard secondReplica = newShard(primary.shardId(), false);
159+
recoverReplica(secondReplica, primary, true);
160+
158161
OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
159162
final CheckpointInfoRequest request = new CheckpointInfoRequest(
160163
1L,
@@ -172,7 +175,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {
172175

173176
final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
174177
1L,
175-
replica.routingEntry().allocationId().getId(),
178+
secondReplica.routingEntry().allocationId().getId(),
176179
replicaDiscoveryNode,
177180
testCheckpoint
178181
);
@@ -187,6 +190,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {
187190
assertEquals(0, copyState.refCount());
188191
assertEquals(0, replications.size());
189192
assertEquals(0, replications.cachedCopyStateSize());
193+
closeShards(secondReplica);
190194
}
191195

192196
public void testStartCopyWithoutPrepareStep() {
@@ -272,4 +276,40 @@ public void onFailure(Exception e) {
272276
}
273277
});
274278
}
279+
280+
public void testCancelAllReplicationsForShard() throws IOException {
281+
// This tests when primary has multiple ongoing replications.
282+
IndexShard replica_2 = newShard(primary.shardId(), false);
283+
recoverReplica(replica_2, primary, true);
284+
285+
OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
286+
final CheckpointInfoRequest request = new CheckpointInfoRequest(
287+
1L,
288+
replica.routingEntry().allocationId().getId(),
289+
primaryDiscoveryNode,
290+
testCheckpoint
291+
);
292+
293+
final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class));
294+
assertEquals(1, copyState.refCount());
295+
296+
final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
297+
1L,
298+
replica_2.routingEntry().allocationId().getId(),
299+
replicaDiscoveryNode,
300+
testCheckpoint
301+
);
302+
replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class));
303+
304+
assertEquals(2, copyState.refCount());
305+
assertEquals(2, replications.size());
306+
assertEquals(1, replications.cachedCopyStateSize());
307+
308+
// cancel the primary's ongoing replications.
309+
replications.cancel(primary, "Test");
310+
assertEquals(0, copyState.refCount());
311+
assertEquals(0, replications.size());
312+
assertEquals(0, replications.cachedCopyStateSize());
313+
closeShards(replica_2);
314+
}
275315
}

server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void testSendFiles() throws IOException {
6666
chunkWriter,
6767
threadPool,
6868
copyState,
69+
primary.routingEntry().allocationId().getId(),
6970
5000,
7071
1
7172
);
@@ -103,6 +104,7 @@ public void testSendFiles_emptyRequest() throws IOException {
103104
chunkWriter,
104105
threadPool,
105106
copyState,
107+
primary.routingEntry().allocationId().getId(),
106108
5000,
107109
1
108110
);
@@ -141,6 +143,7 @@ public void testSendFileFails() throws IOException {
141143
chunkWriter,
142144
threadPool,
143145
copyState,
146+
primary.routingEntry().allocationId().getId(),
144147
5000,
145148
1
146149
);
@@ -178,6 +181,7 @@ public void testReplicationAlreadyRunning() throws IOException {
178181
chunkWriter,
179182
threadPool,
180183
copyState,
184+
primary.routingEntry().allocationId().getId(),
181185
5000,
182186
1
183187
);

0 commit comments

Comments
 (0)