Skip to content

Commit 42d6136

Browse files
committed
Full cluster replication POC
1 parent 40eb0bd commit 42d6136

File tree

55 files changed

+2228
-32
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2228
-32
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,18 @@
439439
import org.opensearch.rest.action.search.RestMultiSearchAction;
440440
import org.opensearch.rest.action.search.RestSearchAction;
441441
import org.opensearch.rest.action.search.RestSearchScrollAction;
442+
import org.opensearch.xreplication.actions.index.StartCCRIndexTaskAction;
443+
import org.opensearch.xreplication.actions.index.TransportStartCCRIndexTaskAction;
444+
import org.opensearch.xreplication.actions.start.RestStartCCRAction;
445+
import org.opensearch.xreplication.actions.start.StartCCRAction;
446+
import org.opensearch.xreplication.actions.start.TransportStartCCRAction;
442447
import org.opensearch.tasks.Task;
443448
import org.opensearch.threadpool.ThreadPool;
444449
import org.opensearch.usage.UsageService;
450+
import org.opensearch.xreplication.actions.followers.StartCCRFollowerTaskAction;
451+
import org.opensearch.xreplication.actions.followers.TransportStartCCRFollowerTaskAction;
452+
import org.opensearch.xreplication.actions.syncsegments.SyncFromLeaderAction;
453+
import org.opensearch.xreplication.actions.syncsegments.TransportSyncFromLeaderAction;
445454

446455
import java.util.ArrayList;
447456
import java.util.Collections;
@@ -731,6 +740,14 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
731740
actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class);
732741
actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class);
733742
actions.register(DeleteDecommissionStateAction.INSTANCE, TransportDeleteDecommissionStateAction.class);
743+
744+
745+
// Tansport actions for Full Cluster Replication
746+
actions.register(StartCCRAction.INSTANCE, TransportStartCCRAction.class);
747+
actions.register(StartCCRFollowerTaskAction.INSTANCE, TransportStartCCRFollowerTaskAction.class);
748+
actions.register(StartCCRIndexTaskAction.INSTANCE, TransportStartCCRIndexTaskAction.class);
749+
actions.register(SyncFromLeaderAction.INSTANCE, TransportSyncFromLeaderAction.class);
750+
734751
return unmodifiableMap(actions.getRegistry());
735752
}
736753

@@ -915,6 +932,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
915932
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));
916933
registerHandler.accept(new RestDeleteDecommissionStateAction());
917934

935+
// xcluster
936+
registerHandler.accept(new RestStartCCRAction());
937+
918938
for (ActionPlugin plugin : actionPlugins) {
919939
for (RestHandler handler : plugin.getRestHandlers(
920940
settings,

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,16 @@ public Iterator<Setting<?>> settings() {
495495
Property.IndexScope,
496496
Property.Final
497497
);
498+
public static final String CCR_REPLICATING_FROM_INDEX = "index.replication.fcr.replicating_from";
499+
public static final Setting<String> CCR_REPLICATING_FROM_INDEX_SETTING = new Setting<>(CCR_REPLICATING_FROM_INDEX, "", Function.identity(), Property.IndexScope);
500+
501+
public static final String CCR_REMOTE_PATH = "index.replication.fcr.remote_path";
502+
public static final Setting<String> CCR_REMOTE_PATH_SETTING = new Setting<>(
503+
CCR_REMOTE_PATH,
504+
"",
505+
Function.identity(),
506+
Property.IndexScope
507+
);
498508

499509
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
500510
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
196196
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
197197
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
198198
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
199+
IndexMetadata.CCR_REPLICATING_FROM_INDEX_SETTING,
199200

200201
// validate that built-in similarities don't get redefined
201202
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ public class FeatureFlags {
2727
*/
2828
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";
2929

30+
/**
31+
* Gates the functionality of full cluster replication.
32+
* Once the feature is ready for production release, this feature flag can be removed.
33+
*/
34+
public static final String X_REPLICATION = "opensearch.experimental.feature.full_cluster_replication.enabled";
35+
3036
/**
3137
* Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
3238
* Once the feature is ready for production release, this feature flag can be removed.
@@ -75,6 +81,7 @@ public static void initializeFeatureFlags(Settings openSearchSettings) {
7581
* and false otherwise.
7682
*/
7783
public static boolean isEnabled(String featureFlagName) {
84+
if (REPLICATION_TYPE.equals(featureFlagName) || REMOTE_STORE.equals(featureFlagName) || X_REPLICATION.equals(featureFlagName)) return true;
7885
if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) {
7986
// TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml
8087
return true;

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.function.Function;
6363
import java.util.function.UnaryOperator;
6464

65+
import static org.opensearch.cluster.metadata.IndexMetadata.CCR_REMOTE_PATH_SETTING;
6566
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
6667
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
6768
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
@@ -587,6 +588,8 @@ public final class IndexSettings {
587588
private final int numberOfShards;
588589
private final ReplicationType replicationType;
589590
private final boolean isRemoteStoreEnabled;
591+
private final boolean isCCRReplicatingIndex;
592+
private final String ccrReplicatingFrom;
590593
private final boolean isRemoteTranslogStoreEnabled;
591594
private final TimeValue remoteTranslogUploadBufferInterval;
592595
private final String remoteStoreTranslogRepository;
@@ -761,6 +764,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
761764
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
762765
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
763766
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
767+
ccrReplicatingFrom = settings.get(IndexMetadata.CCR_REPLICATING_FROM_INDEX, "");
768+
isCCRReplicatingIndex = !Strings.isNullOrEmpty(ccrReplicatingFrom);
764769
remoteTranslogUploadBufferInterval = settings.getAsTime(
765770
IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL,
766771
TimeValue.timeValueMillis(100)
@@ -1024,6 +1029,13 @@ public boolean isRemoteStoreEnabled() {
10241029
return isRemoteStoreEnabled;
10251030
}
10261031

1032+
public boolean isCCRReplicatingIndex() {
1033+
return isCCRReplicatingIndex;
1034+
}
1035+
1036+
public String getCCRReplicatingFrom() {
1037+
return ccrReplicatingFrom;
1038+
}
10271039
/**
10281040
* Returns remote store repository configured for this index.
10291041
*/
@@ -1565,4 +1577,8 @@ private void setMergeOnFlushPolicy(String policy) {
15651577
public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
15661578
return Optional.ofNullable(mergeOnFlushPolicy);
15671579
}
1580+
1581+
public String getCCRRemotePath() {
1582+
return CCR_REMOTE_PATH_SETTING.get(settings);
1583+
}
15681584
}

server/src/main/java/org/opensearch/index/engine/EngineConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,11 @@ public boolean isReadOnlyReplica() {
408408
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
409409
}
410410

411+
412+
public boolean isReadOnlyPrimary() {
413+
return indexSettings.isSegRepEnabled() && indexSettings.isRemoteStoreEnabled() && indexSettings.isCCRReplicatingIndex() && !isReadOnlyReplica;
414+
}
415+
411416
/**
412417
* Returns the underlying primaryModeSupplier.
413418
* @return the primary mode supplier.

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ public InternalTranslogManager translogManager() {
256256
throttle = new IndexThrottle();
257257
try {
258258
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
259+
//TODO: fix this. Reading translogUUID from segments won't work if the segments are from a different cluster(CCR).
259260
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
260261
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
261262
TranslogEventListener internalTranslogEventListener = new TranslogEventListener() {

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
public class NRTReplicationEngineFactory implements EngineFactory {
1818
@Override
1919
public Engine newReadWriteEngine(EngineConfig config) {
20-
if (config.isReadOnlyReplica()) {
20+
// Load NRTReplicationEngine for primaries on CCR Follower as well.
21+
if (config.isReadOnlyReplica() || config.isReadOnlyPrimary()) {
2122
return new NRTReplicationEngine(config);
2223
}
2324
return new InternalEngine(config);

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@
162162
import org.opensearch.index.store.StoreStats;
163163
import org.opensearch.index.translog.Translog;
164164
import org.opensearch.index.translog.TranslogConfig;
165+
import org.opensearch.index.translog.TranslogCorruptedException;
165166
import org.opensearch.index.translog.TranslogFactory;
166167
import org.opensearch.index.translog.TranslogStats;
167168
import org.opensearch.index.warmer.ShardIndexWarmerService;
@@ -2185,7 +2186,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21852186
assert currentEngineReference.get() == null : "engine is running";
21862187
verifyNotClosed();
21872188
if (indexSettings.isRemoteStoreEnabled()) {
2188-
syncSegmentsFromRemoteSegmentStore(false, true, false);
2189+
syncSegmentsFromRemoteSegmentStore(false, true, true);
21892190
}
21902191
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
21912192
final Engine newEngine = engineFactory.newReadWriteEngine(config);
@@ -3502,10 +3503,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
35023503

35033504
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
35043505
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
3505-
if (isRemoteStoreEnabled()) {
3506+
// Skip SegRep refresh listener for CCR follower indices.
3507+
if (isRemoteStoreEnabled() && !indexSettings.isCCRReplicatingIndex()) {
35063508
internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteSegmentNotificationPublisher));
35073509
}
3508-
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled()) {
3510+
// Skip SegRep refresh listener for CCR leader as well as follower indices.
3511+
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled() && !indexSettings.isCCRReplicatingIndex()) {
35093512
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
35103513
}
35113514

@@ -4352,7 +4355,7 @@ public void close() throws IOException {
43524355
};
43534356
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
43544357
if (indexSettings.isRemoteStoreEnabled()) {
4355-
syncSegmentsFromRemoteSegmentStore(false, true, false);
4358+
syncSegmentsFromRemoteSegmentStore(false, true, true);
43564359
}
43574360
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43584361
onNewEngine(newEngineReference.get());
@@ -4456,13 +4459,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
44564459
indexInput,
44574460
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
44584461
);
4462+
44594463
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
44604464
if (shouldCommit) {
4461-
finalizeReplication(infosSnapshot);
4462-
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
4465+
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
44634466
}
44644467
else {
4465-
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
4468+
// We don't need to trigger a commit for segment copy from primaries(like SegRep with remote store, CCR)
4469+
finalizeReplication(infosSnapshot);
4470+
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
44664471
}
44674472
}
44684473
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreSegmentUploadNotificationPublisher.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.opensearch.common.inject.Inject;
1212
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
1313
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
14-
14+
import org.opensearch.xreplication.actions.notifysecondary.NotifyCCRFollowersAction;
1515

1616
/**
1717
* Hook to publish notification after primary uploads segments to the remote store.
@@ -20,18 +20,22 @@
2020
*/
2121
public class RemoteStoreSegmentUploadNotificationPublisher {
2222
private final SegmentReplicationCheckpointPublisher segRepPublisher;
23+
private final NotifyCCRFollowersAction xReplicatePublisher;
2324

2425
@Inject
25-
public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher) {
26+
public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher, NotifyCCRFollowersAction xReplicatePublisher) {
2627
this.segRepPublisher = segRepPublisher;
28+
this.xReplicatePublisher = xReplicatePublisher;
2729
}
2830

31+
// Notify replicas and CCR followers after the segments have been uploaded to the remote store by primary(during refresh).
2932
public void notifySegmentUpload(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
30-
// TODO: Add separate publisher for CCR.
3133
// we don't call indexShard.getLatestReplicationCheckpoint() as it might have a newer refreshed checkpoint.
3234
// Instead we send the one which has been uploaded to remote store.
35+
// TODO: Parallise both the notifications.
3336
if (segRepPublisher != null) segRepPublisher.publish(indexShard, checkpoint);
37+
if (xReplicatePublisher != null) xReplicatePublisher.publish(indexShard, checkpoint);
3438
}
3539

36-
public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null);
40+
public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null, null);
3741
}

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
457457
remoteStore.incRef();
458458
try {
459459
// Download segments from remote segment store
460-
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, false);
460+
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);
461461

462462
if (store.directory().listAll().length == 0) {
463463
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw
4141
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
4242
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
4343
BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath();
44-
commonBlobPath = commonBlobPath.add(indexSettings.getIndex().getUUID())
44+
String indexUUID = indexSettings.getIndex().getUUID();
45+
// override the follower's remote store path to leader index's uuid.
46+
if (indexSettings.isCCRReplicatingIndex()) {
47+
indexUUID = indexSettings.getCCRReplicatingFrom();
48+
}
49+
commonBlobPath = commonBlobPath.add(indexUUID)
4550
.add(String.valueOf(path.getShardId().getId()))
4651
.add("segments");
4752

server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,7 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
7373
ShardPath localShardPath,
7474
BlobStoreRepository blobStoreRepository
7575
) throws IOException {
76-
final BlobPath blobPath = blobStoreRepository.basePath()
77-
.add("indices")
78-
.add(IndexSettings.SEARCHABLE_SNAPSHOT_INDEX_ID.get(indexSettings.getSettings()))
79-
.add(Integer.toString(localShardPath.getShardId().getId()));
76+
final BlobPath blobPath = getBlobPath(indexSettings, localShardPath, blobStoreRepository);
8077
final SnapshotId snapshotId = new SnapshotId(
8178
IndexSettings.SEARCHABLE_SNAPSHOT_ID_NAME.get(indexSettings.getSettings()),
8279
IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID.get(indexSettings.getSettings())
@@ -94,4 +91,15 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
9491
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
9592
});
9693
}
94+
95+
private BlobPath getBlobPath(IndexSettings indexSettings, ShardPath localShardPath, BlobStoreRepository blobStoreRepository) {
96+
if(indexSettings.isCCRReplicatingIndex()) {
97+
return blobStoreRepository.basePath().add(indexSettings.getCCRRemotePath());
98+
} else {
99+
return blobStoreRepository.basePath()
100+
.add("indices")
101+
.add(IndexSettings.SEARCHABLE_SNAPSHOT_INDEX_ID.get(indexSettings.getSettings()))
102+
.add(Integer.toString(localShardPath.getShardId().getId()));
103+
}
104+
}
97105
}

server/src/main/java/org/opensearch/index/translog/TranslogHeader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.index.translog;
3434

35+
import org.apache.logging.log4j.Logger;
3536
import org.apache.lucene.codecs.CodecUtil;
3637
import org.apache.lucene.index.CorruptIndexException;
3738
import org.apache.lucene.index.IndexFormatTooNewException;
@@ -43,6 +44,7 @@
4344
import org.opensearch.common.io.stream.InputStreamStreamInput;
4445
import org.opensearch.common.io.stream.OutputStreamStreamOutput;
4546
import org.opensearch.common.io.stream.StreamInput;
47+
import org.opensearch.common.logging.Loggers;
4648

4749
import java.io.EOFException;
4850
import java.io.IOException;
@@ -180,13 +182,16 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil
180182

181183
// verify UUID only after checksum, to ensure that UUID is not corrupted
182184
final BytesRef expectedUUID = new BytesRef(translogUUID);
185+
Logger logger = Loggers.getLogger(TranslogHeader.class, "");
186+
// Skipping the check for now.
187+
/*
183188
if (uuid.bytesEquals(expectedUUID) == false) {
184189
throw new TranslogCorruptedException(
185190
path.toString(),
186191
"expected shard UUID " + expectedUUID + " but got: " + uuid + " this translog file belongs to a different translog"
187192
);
188193
}
189-
194+
*/
190195
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
191196
} catch (EOFException e) {
192197
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);

0 commit comments

Comments
 (0)