Skip to content

Commit 40eb0bd

Browse files
committed
Allow replica to fetch segments from remote store instead of leader node
Signed-off-by: Ankit Kala <ankikala@amazon.com>
1 parent 2ce07f2 commit 40eb0bd

30 files changed

+329
-137
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,8 @@ public static final IndexShard newIndexShard(
678678
cbs,
679679
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
680680
SegmentReplicationCheckpointPublisher.EMPTY,
681-
null
681+
null,
682+
RemoteStoreSegmentUploadNotificationPublisher.EMPTY
682683
);
683684
}
684685

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.index.shard.IndexShard;
8282
import org.opensearch.index.shard.IndexShardClosedException;
8383
import org.opensearch.index.shard.IndexingOperationListener;
84+
import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher;
8485
import org.opensearch.index.shard.SearchOperationListener;
8586
import org.opensearch.index.shard.ShardId;
8687
import org.opensearch.index.shard.ShardNotFoundException;
@@ -438,7 +439,8 @@ public synchronized IndexShard createShard(
438439
final ShardRouting routing,
439440
final Consumer<ShardId> globalCheckpointSyncer,
440441
final RetentionLeaseSyncer retentionLeaseSyncer,
441-
final SegmentReplicationCheckpointPublisher checkpointPublisher
442+
final SegmentReplicationCheckpointPublisher checkpointPublisher,
443+
final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher
442444
) throws IOException {
443445
Objects.requireNonNull(retentionLeaseSyncer);
444446
/*
@@ -506,7 +508,8 @@ public synchronized IndexShard createShard(
506508
circuitBreakerService,
507509
translogFactorySupplier,
508510
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
509-
remoteStore
511+
remoteStore,
512+
remoteSegmentNotificationPublisher
510513
);
511514
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
512515
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
485485

486486
@Override
487487
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
488-
throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog.");
488+
return this;
489489
}
490490

491491
@Override

server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void beforeRefresh() throws IOException {
4040

4141
@Override
4242
public void afterRefresh(boolean didRefresh) throws IOException {
43-
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
43+
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()
44+
&& !shard.indexSettings.isRemoteStoreEnabled()) {
4445
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
4546
}
4647
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ Runnable getGlobalCheckpointSyncer() {
327327

328328
private final Store remoteStore;
329329
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
330+
private final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher;
330331

331332
public IndexShard(
332333
final ShardRouting shardRouting,
@@ -351,7 +352,8 @@ public IndexShard(
351352
final CircuitBreakerService circuitBreakerService,
352353
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
353354
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
354-
@Nullable final Store remoteStore
355+
@Nullable final Store remoteStore,
356+
RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher
355357
) throws IOException {
356358
super(shardRouting.shardId(), indexSettings);
357359
assert shardRouting.initializing();
@@ -403,6 +405,7 @@ public IndexShard(
403405
this.pendingPrimaryTerm = primaryTerm;
404406
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger);
405407
this.pendingReplicationActions = new PendingReplicationActions(shardId, threadPool);
408+
this.remoteSegmentNotificationPublisher = remoteSegmentNotificationPublisher;
406409
this.replicationTracker = new ReplicationTracker(
407410
shardId,
408411
aId,
@@ -2182,7 +2185,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21822185
assert currentEngineReference.get() == null : "engine is running";
21832186
verifyNotClosed();
21842187
if (indexSettings.isRemoteStoreEnabled()) {
2185-
syncSegmentsFromRemoteSegmentStore(false);
2188+
syncSegmentsFromRemoteSegmentStore(false, true, false);
21862189
}
21872190
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
21882191
final Engine newEngine = engineFactory.newReadWriteEngine(config);
@@ -3090,6 +3093,7 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S
30903093
* When remote translog is enabled for an index, replication operation is limited to primary term validation and does not
30913094
* update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint.
30923095
*/
3096+
30933097
assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED)
30943098
|| indexSettings.isRemoteTranslogStoreEnabled() : "supposedly in-sync shard copy received a global checkpoint ["
30953099
+ globalCheckpoint
@@ -3499,11 +3503,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
34993503
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
35003504
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
35013505
if (isRemoteStoreEnabled()) {
3502-
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
3506+
internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteSegmentNotificationPublisher));
35033507
}
3504-
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
3508+
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled()) {
35053509
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
35063510
}
3511+
35073512
/**
35083513
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
35093514
* change to a writeable engine during relocation handoff after a round of segment replication.
@@ -4078,7 +4083,7 @@ EngineConfigFactory getEngineConfigFactory() {
40784083
}
40794084

40804085
// for tests
4081-
ReplicationTracker getReplicationTracker() {
4086+
public ReplicationTracker getReplicationTracker() {
40824087
return replicationTracker;
40834088
}
40844089

@@ -4347,7 +4352,7 @@ public void close() throws IOException {
43474352
};
43484353
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
43494354
if (indexSettings.isRemoteStoreEnabled()) {
4350-
syncSegmentsFromRemoteSegmentStore(false);
4355+
syncSegmentsFromRemoteSegmentStore(false, true, false);
43514356
}
43524357
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43534358
onNewEngine(newEngineReference.get());
@@ -4380,23 +4385,14 @@ public void close() throws IOException {
43804385
onSettingsChanged();
43814386
}
43824387

4383-
/**
4384-
* Downloads segments from remote segment store. This method will download segments till
4385-
* last refresh checkpoint.
4386-
* @param overrideLocal flag to override local segment files with those in remote store
4387-
* @throws IOException if exception occurs while reading segments from remote store
4388-
*/
4389-
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
4390-
syncSegmentsFromRemoteSegmentStore(overrideLocal, true);
4391-
}
4392-
43934388
/**
43944389
* Downloads segments from remote segment store.
43954390
* @param overrideLocal flag to override local segment files with those in remote store
43964391
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
4392+
* @param shouldCommit if the shard requires committing the changes after sync from remote.
43974393
* @throws IOException if exception occurs while reading segments from remote store
43984394
*/
4399-
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
4395+
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) throws IOException {
44004396
assert indexSettings.isRemoteStoreEnabled();
44014397
logger.info("Downloading segments from remote segment store");
44024398
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
@@ -4448,6 +4444,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
44484444
skippedSegments.add(file);
44494445
}
44504446
}
4447+
44514448
if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) {
44524449
try (
44534450
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
@@ -4460,7 +4457,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
44604457
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
44614458
);
44624459
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
4463-
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
4460+
if (shouldCommit) {
4461+
finalizeReplication(infosSnapshot);
4462+
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
4463+
}
4464+
else {
4465+
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
4466+
}
44644467
}
44654468
}
44664469
} catch (IOException e) {

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.opensearch.index.engine.InternalEngine;
2626
import org.opensearch.index.seqno.SequenceNumbers;
2727
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
28+
import org.opensearch.index.store.StoreFileMetadata;
29+
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
2830

2931
import java.io.IOException;
3032
import java.util.Collection;
@@ -57,14 +59,16 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
5759
private final RemoteSegmentStoreDirectory remoteDirectory;
5860
private final Map<String, String> localSegmentChecksumMap;
5961
private long primaryTerm;
62+
private final RemoteStoreSegmentUploadNotificationPublisher notificationPublisher;
6063
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);
6164

62-
public RemoteStoreRefreshListener(IndexShard indexShard) {
65+
public RemoteStoreRefreshListener(IndexShard indexShard, RemoteStoreSegmentUploadNotificationPublisher notificationPublisher) {
6366
this.indexShard = indexShard;
6467
this.storeDirectory = indexShard.store().directory();
6568
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
6669
.getDelegate()).getDelegate();
6770
this.primaryTerm = indexShard.getOperationPrimaryTerm();
71+
this.notificationPublisher = notificationPublisher;
6872
localSegmentChecksumMap = new HashMap<>();
6973
if (indexShard.shardRouting.primary()) {
7074
try {
@@ -103,6 +107,9 @@ public void afterRefresh(boolean didRefresh) {
103107
deleteStaleCommits();
104108
}
105109

110+
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can move.
111+
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
112+
106113
String segmentInfoSnapshotFilename = null;
107114
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
108115
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
@@ -148,6 +155,10 @@ public void afterRefresh(boolean didRefresh) {
148155
.lastRefreshedCheckpoint();
149156
((InternalEngine) indexShard.getEngine()).translogManager()
150157
.setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
158+
159+
if (!RemoteStoreSegmentUploadNotificationPublisher.EMPTY.equals(notificationPublisher)) {
160+
notificationPublisher.notifySegmentUpload(indexShard, checkpoint);
161+
}
151162
}
152163
}
153164
} catch (EngineException e) {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.opensearch.common.inject.Inject;
12+
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
13+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
14+
15+
16+
/**
17+
* Hook to publish notification after primary uploads segments to the remote store.
18+
*
19+
* @opensearch.internal
20+
*/
21+
public class RemoteStoreSegmentUploadNotificationPublisher {
22+
private final SegmentReplicationCheckpointPublisher segRepPublisher;
23+
24+
@Inject
25+
public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher) {
26+
this.segRepPublisher = segRepPublisher;
27+
}
28+
29+
public void notifySegmentUpload(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
30+
// TODO: Add separate publisher for CCR.
31+
// we don't call indexShard.getLatestReplicationCheckpoint() as it might have a newer refreshed checkpoint.
32+
// Instead we send the one which has been uploaded to remote store.
33+
if (segRepPublisher != null) segRepPublisher.publish(indexShard, checkpoint);
34+
}
35+
36+
public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null);
37+
}

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
457457
remoteStore.incRef();
458458
try {
459459
// Download segments from remote segment store
460-
indexShard.syncSegmentsFromRemoteSegmentStore(true);
460+
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, false);
461461

462462
if (store.directory().listAll().length == 0) {
463463
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void init() throws IOException {
118118
* @return Map of segment filename to uploaded filename with checksum
119119
* @throws IOException if there were any failures in reading the metadata file
120120
*/
121-
private Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOException {
121+
public Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOException {
122122
Map<String, UploadedSegmentMetadata> segmentMetadataMap = new HashMap<>();
123123

124124
Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
@@ -149,7 +149,6 @@ private Map<String, UploadedSegmentMetadata> readMetadataFile(String metadataFil
149149
public static class UploadedSegmentMetadata {
150150
// Visible for testing
151151
static final String SEPARATOR = "::";
152-
153152
private final String originalFilename;
154153
private final String uploadedFilename;
155154
private final String checksum;
@@ -179,6 +178,10 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) {
179178
String[] values = uploadedFilename.split(SEPARATOR);
180179
return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
181180
}
181+
182+
public String getOriginalFilename() {
183+
return originalFilename;
184+
}
182185
}
183186

184187
/**

server/src/main/java/org/opensearch/index/store/Store.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,8 @@ public static RecoveryDiff segmentReplicationDiff(Map<String, StoreFileMetadata>
399399
missing.add(value);
400400
} else {
401401
final StoreFileMetadata fileMetadata = target.get(value.name());
402-
if (fileMetadata.isSame(value)) {
402+
// match segments using checksum
403+
if (fileMetadata.checksum().equals(value.checksum())) {
403404
identical.add(value);
404405
} else {
405406
different.add(value);

server/src/main/java/org/opensearch/indices/IndicesModule.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.opensearch.index.seqno.RetentionLeaseSyncer;
7474
import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
7575
import org.opensearch.index.shard.PrimaryReplicaSyncer;
76+
import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher;
7677
import org.opensearch.indices.cluster.IndicesClusterStateService;
7778
import org.opensearch.indices.mapper.MapperRegistry;
7879
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
@@ -287,6 +288,11 @@ protected void configure() {
287288
} else {
288289
bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY);
289290
}
291+
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) {
292+
bind(RemoteStoreSegmentUploadNotificationPublisher.class).asEagerSingleton();
293+
} else {
294+
bind(RemoteStoreSegmentUploadNotificationPublisher.class).toInstance(RemoteStoreSegmentUploadNotificationPublisher.EMPTY);
295+
}
290296
}
291297

292298
/**

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@
135135
import org.opensearch.index.shard.IndexingStats;
136136
import org.opensearch.index.shard.ShardId;
137137
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
138+
import org.opensearch.index.shard.RemoteStoreSegmentUploadNotificationPublisher;
138139
import org.opensearch.index.translog.InternalTranslogFactory;
139140
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
140141
import org.opensearch.index.translog.TranslogFactory;
@@ -161,7 +162,6 @@
161162
import org.opensearch.search.query.QueryPhase;
162163
import org.opensearch.search.query.QuerySearchResult;
163164
import org.opensearch.threadpool.ThreadPool;
164-
165165
import java.io.Closeable;
166166
import java.io.IOException;
167167
import java.io.InputStream;
@@ -1023,14 +1023,15 @@ public IndexShard createShard(
10231023
final Consumer<ShardId> globalCheckpointSyncer,
10241024
final RetentionLeaseSyncer retentionLeaseSyncer,
10251025
final DiscoveryNode targetNode,
1026-
final DiscoveryNode sourceNode
1026+
final DiscoveryNode sourceNode,
1027+
final RemoteStoreSegmentUploadNotificationPublisher remoteSegmentNotificationPublisher
10271028
) throws IOException {
10281029
Objects.requireNonNull(retentionLeaseSyncer);
10291030
ensureChangesAllowed();
10301031
IndexService indexService = indexService(shardRouting.index());
10311032
assert indexService != null;
10321033
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
1033-
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
1034+
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, remoteSegmentNotificationPublisher);
10341035
indexShard.addShardFailureCallback(onShardFailure);
10351036
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
10361037
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS

0 commit comments

Comments
 (0)