Skip to content

Commit a4182e1

Browse files
committed
Full cluster replication POC
1 parent 40eb0bd commit a4182e1

File tree

56 files changed

+2267
-34
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2267
-34
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,18 @@
439439
import org.opensearch.rest.action.search.RestMultiSearchAction;
440440
import org.opensearch.rest.action.search.RestSearchAction;
441441
import org.opensearch.rest.action.search.RestSearchScrollAction;
442+
import org.opensearch.xreplication.actions.index.StartIndexTaskAction;
443+
import org.opensearch.xreplication.actions.index.TransportStartIndexTaskAction;
444+
import org.opensearch.xreplication.actions.start.RestXReplicateAction;
445+
import org.opensearch.xreplication.actions.start.StartXReplication;
446+
import org.opensearch.xreplication.actions.start.TransportXReplicateAction;
442447
import org.opensearch.tasks.Task;
443448
import org.opensearch.threadpool.ThreadPool;
444449
import org.opensearch.usage.UsageService;
450+
import org.opensearch.xreplication.actions.followers.StartFollowersAction;
451+
import org.opensearch.xreplication.actions.followers.TransportStartFollowerAction;
452+
import org.opensearch.xreplication.actions.syncsegments.SyncFromLeaderAction;
453+
import org.opensearch.xreplication.actions.syncsegments.TransportSyncFromLeaderAction;
445454

446455
import java.util.ArrayList;
447456
import java.util.Collections;
@@ -731,6 +740,14 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
731740
actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class);
732741
actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class);
733742
actions.register(DeleteDecommissionStateAction.INSTANCE, TransportDeleteDecommissionStateAction.class);
743+
744+
745+
// Full Cluster Replication
746+
actions.register(StartXReplication.INSTANCE, TransportXReplicateAction.class);
747+
actions.register(StartFollowersAction.INSTANCE, TransportStartFollowerAction.class);
748+
actions.register(StartIndexTaskAction.INSTANCE, TransportStartIndexTaskAction.class);
749+
actions.register(SyncFromLeaderAction.INSTANCE, TransportSyncFromLeaderAction.class);
750+
734751
return unmodifiableMap(actions.getRegistry());
735752
}
736753

@@ -915,6 +932,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
915932
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));
916933
registerHandler.accept(new RestDeleteDecommissionStateAction());
917934

935+
// xcluster
936+
registerHandler.accept(new RestXReplicateAction());
937+
918938
for (ActionPlugin plugin : actionPlugins) {
919939
for (RestHandler handler : plugin.getRestHandlers(
920940
settings,

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,16 @@ public Iterator<Setting<?>> settings() {
495495
Property.IndexScope,
496496
Property.Final
497497
);
498+
public static final String CCR_REPLICATING_FROM_INDEX = "index.replication.fcr.replicating_from";
499+
public static final Setting<String> CCR_REPLICATING_FROM_INDEX_SETTING = new Setting<>(CCR_REPLICATING_FROM_INDEX, "", Function.identity(), Property.IndexScope);
500+
501+
public static final String CCR_REMOTE_PATH = "index.replication.fcr.remote_path";
502+
public static final Setting<String> CCR_REMOTE_PATH_SETTING = new Setting<>(
503+
CCR_REMOTE_PATH,
504+
"",
505+
Function.identity(),
506+
Property.IndexScope
507+
);
498508

499509
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
500510
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
196196
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
197197
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
198198
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
199+
IndexMetadata.CCR_REPLICATING_FROM_INDEX_SETTING,
199200

200201
// validate that built-in similarities don't get redefined
201202
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ public class FeatureFlags {
2727
*/
2828
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";
2929

30+
/**
31+
* Gates the functionality of full cluster replication.
32+
* Once the feature is ready for production release, this feature flag can be removed.
33+
*/
34+
public static final String X_REPLICATION = "opensearch.experimental.feature.full_cluster_replication.enabled";
35+
3036
/**
3137
* Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
3238
* Once the feature is ready for production release, this feature flag can be removed.
@@ -75,6 +81,7 @@ public static void initializeFeatureFlags(Settings openSearchSettings) {
7581
* and false otherwise.
7682
*/
7783
public static boolean isEnabled(String featureFlagName) {
84+
if (REPLICATION_TYPE.equals(featureFlagName) || REMOTE_STORE.equals(featureFlagName) || X_REPLICATION.equals(featureFlagName)) return true;
7885
if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) {
7986
// TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml
8087
return true;

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.function.Function;
6363
import java.util.function.UnaryOperator;
6464

65+
import static org.opensearch.cluster.metadata.IndexMetadata.CCR_REMOTE_PATH_SETTING;
6566
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
6667
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
6768
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
@@ -587,6 +588,8 @@ public final class IndexSettings {
587588
private final int numberOfShards;
588589
private final ReplicationType replicationType;
589590
private final boolean isRemoteStoreEnabled;
591+
private final boolean isCCRReplicatingIndex;
592+
private final String ccrReplicatingFrom;
590593
private final boolean isRemoteTranslogStoreEnabled;
591594
private final TimeValue remoteTranslogUploadBufferInterval;
592595
private final String remoteStoreTranslogRepository;
@@ -761,6 +764,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
761764
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
762765
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
763766
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
767+
ccrReplicatingFrom = settings.get(IndexMetadata.CCR_REPLICATING_FROM_INDEX, "");
768+
isCCRReplicatingIndex = !Strings.isNullOrEmpty(ccrReplicatingFrom);
764769
remoteTranslogUploadBufferInterval = settings.getAsTime(
765770
IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL,
766771
TimeValue.timeValueMillis(100)
@@ -1024,6 +1029,13 @@ public boolean isRemoteStoreEnabled() {
10241029
return isRemoteStoreEnabled;
10251030
}
10261031

1032+
public boolean isCCRReplicatingIndex() {
1033+
return isCCRReplicatingIndex;
1034+
}
1035+
1036+
public String getCCRReplicatingFrom() {
1037+
return ccrReplicatingFrom;
1038+
}
10271039
/**
10281040
* Returns remote store repository configured for this index.
10291041
*/
@@ -1565,4 +1577,8 @@ private void setMergeOnFlushPolicy(String policy) {
15651577
public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
15661578
return Optional.ofNullable(mergeOnFlushPolicy);
15671579
}
1580+
1581+
public String getCCRRemotePath() {
1582+
return CCR_REMOTE_PATH_SETTING.get(settings);
1583+
}
15681584
}

server/src/main/java/org/opensearch/index/engine/EngineConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,12 @@ public boolean isReadOnlyReplica() {
408408
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
409409
}
410410

411+
412+
public boolean isReadOnlyPrimary() {
413+
//TODO: verify on segrep enabled.
414+
return indexSettings.isSegRepEnabled() && indexSettings.isRemoteStoreEnabled() && indexSettings.isCCRReplicatingIndex() && !isReadOnlyReplica;
415+
}
416+
411417
/**
412418
* Returns the underlying primaryModeSupplier.
413419
* @return the primary mode supplier.

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ public InternalTranslogManager translogManager() {
256256
throttle = new IndexThrottle();
257257
try {
258258
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
259+
//TODO: fix this. Reading translogUUID from segments won't work if the segments are from a different cluster(CCR).
259260
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
260261
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
261262
TranslogEventListener internalTranslogEventListener = new TranslogEventListener() {

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
public class NRTReplicationEngineFactory implements EngineFactory {
1818
@Override
1919
public Engine newReadWriteEngine(EngineConfig config) {
20-
if (config.isReadOnlyReplica()) {
20+
if (config.isReadOnlyReplica() || config.isReadOnlyPrimary()) {
2121
return new NRTReplicationEngine(config);
2222
}
2323
return new InternalEngine(config);

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@
162162
import org.opensearch.index.store.StoreStats;
163163
import org.opensearch.index.translog.Translog;
164164
import org.opensearch.index.translog.TranslogConfig;
165+
import org.opensearch.index.translog.TranslogCorruptedException;
165166
import org.opensearch.index.translog.TranslogFactory;
166167
import org.opensearch.index.translog.TranslogStats;
167168
import org.opensearch.index.warmer.ShardIndexWarmerService;
@@ -2185,7 +2186,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21852186
assert currentEngineReference.get() == null : "engine is running";
21862187
verifyNotClosed();
21872188
if (indexSettings.isRemoteStoreEnabled()) {
2188-
syncSegmentsFromRemoteSegmentStore(false, true, false);
2189+
syncSegmentsFromRemoteSegmentStore(false, true, true);
21892190
}
21902191
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
21912192
final Engine newEngine = engineFactory.newReadWriteEngine(config);
@@ -3502,10 +3503,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
35023503

35033504
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
35043505
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
3505-
if (isRemoteStoreEnabled()) {
3506+
if (isRemoteStoreEnabled() && !indexSettings.isCCRReplicatingIndex()) {
35063507
internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteSegmentNotificationPublisher));
35073508
}
3508-
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled()) {
3509+
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary() && !indexSettings.isRemoteStoreEnabled() && !indexSettings.isCCRReplicatingIndex()) {
35093510
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
35103511
}
35113512

@@ -4352,7 +4353,7 @@ public void close() throws IOException {
43524353
};
43534354
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
43544355
if (indexSettings.isRemoteStoreEnabled()) {
4355-
syncSegmentsFromRemoteSegmentStore(false, true, false);
4356+
syncSegmentsFromRemoteSegmentStore(false, true, true);
43564357
}
43574358
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43584359
onNewEngine(newEngineReference.get());
@@ -4456,13 +4457,14 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
44564457
indexInput,
44574458
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
44584459
);
4460+
44594461
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
44604462
if (shouldCommit) {
4461-
finalizeReplication(infosSnapshot);
4462-
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
4463+
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
44634464
}
44644465
else {
4465-
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
4466+
finalizeReplication(infosSnapshot);
4467+
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infosSnapshot);
44664468
}
44674469
}
44684470
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreSegmentUploadNotificationPublisher.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.opensearch.common.inject.Inject;
1212
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
1313
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
14-
14+
import org.opensearch.xreplication.actions.notifysecondary.NotifySecondariesAction;
1515

1616
/**
1717
* Hook to publish notification after primary uploads segments to the remote store.
@@ -20,18 +20,21 @@
2020
*/
2121
public class RemoteStoreSegmentUploadNotificationPublisher {
2222
private final SegmentReplicationCheckpointPublisher segRepPublisher;
23+
private final NotifySecondariesAction xReplicatePublisher;
2324

2425
@Inject
25-
public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher) {
26+
public RemoteStoreSegmentUploadNotificationPublisher(SegmentReplicationCheckpointPublisher segRepPublisher, NotifySecondariesAction xReplicatePublisher) {
2627
this.segRepPublisher = segRepPublisher;
28+
this.xReplicatePublisher = xReplicatePublisher;
2729
}
2830

2931
public void notifySegmentUpload(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
30-
// TODO: Add separate publisher for CCR.
3132
// we don't call indexShard.getLatestReplicationCheckpoint() as it might have a newer refreshed checkpoint.
3233
// Instead we send the one which has been uploaded to remote store.
34+
// TODO: Parallise both the notifications.
3335
if (segRepPublisher != null) segRepPublisher.publish(indexShard, checkpoint);
36+
if (xReplicatePublisher != null) xReplicatePublisher.publish(indexShard, checkpoint);
3437
}
3538

36-
public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null);
39+
public static final RemoteStoreSegmentUploadNotificationPublisher EMPTY = new RemoteStoreSegmentUploadNotificationPublisher(null, null);
3740
}

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
457457
remoteStore.incRef();
458458
try {
459459
// Download segments from remote segment store
460-
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, false);
460+
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);
461461

462462
if (store.directory().listAll().length == 0) {
463463
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw
4141
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
4242
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
4343
BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath();
44-
commonBlobPath = commonBlobPath.add(indexSettings.getIndex().getUUID())
44+
String indexUUID = indexSettings.getIndex().getUUID();
45+
if (indexSettings.isCCRReplicatingIndex()) {
46+
indexUUID = indexSettings.getCCRReplicatingFrom();
47+
}
48+
commonBlobPath = commonBlobPath.add(indexUUID)
4549
.add(String.valueOf(path.getShardId().getId()))
4650
.add("segments");
4751

server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,7 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
7373
ShardPath localShardPath,
7474
BlobStoreRepository blobStoreRepository
7575
) throws IOException {
76-
final BlobPath blobPath = blobStoreRepository.basePath()
77-
.add("indices")
78-
.add(IndexSettings.SEARCHABLE_SNAPSHOT_INDEX_ID.get(indexSettings.getSettings()))
79-
.add(Integer.toString(localShardPath.getShardId().getId()));
76+
final BlobPath blobPath = getBlobPath(indexSettings, localShardPath, blobStoreRepository);
8077
final SnapshotId snapshotId = new SnapshotId(
8178
IndexSettings.SEARCHABLE_SNAPSHOT_ID_NAME.get(indexSettings.getSettings()),
8279
IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID.get(indexSettings.getSettings())
@@ -94,4 +91,15 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
9491
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
9592
});
9693
}
94+
95+
private BlobPath getBlobPath(IndexSettings indexSettings, ShardPath localShardPath, BlobStoreRepository blobStoreRepository) {
96+
if(indexSettings.isCCRReplicatingIndex()) {
97+
return blobStoreRepository.basePath().add(indexSettings.getCCRRemotePath());
98+
} else {
99+
return blobStoreRepository.basePath()
100+
.add("indices")
101+
.add(IndexSettings.SEARCHABLE_SNAPSHOT_INDEX_ID.get(indexSettings.getSettings()))
102+
.add(Integer.toString(localShardPath.getShardId().getId()));
103+
}
104+
}
97105
}

server/src/main/java/org/opensearch/index/translog/TranslogHeader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.index.translog;
3434

35+
import org.apache.logging.log4j.Logger;
3536
import org.apache.lucene.codecs.CodecUtil;
3637
import org.apache.lucene.index.CorruptIndexException;
3738
import org.apache.lucene.index.IndexFormatTooNewException;
@@ -43,6 +44,7 @@
4344
import org.opensearch.common.io.stream.InputStreamStreamInput;
4445
import org.opensearch.common.io.stream.OutputStreamStreamOutput;
4546
import org.opensearch.common.io.stream.StreamInput;
47+
import org.opensearch.common.logging.Loggers;
4648

4749
import java.io.EOFException;
4850
import java.io.IOException;
@@ -180,13 +182,16 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil
180182

181183
// verify UUID only after checksum, to ensure that UUID is not corrupted
182184
final BytesRef expectedUUID = new BytesRef(translogUUID);
185+
Logger logger = Loggers.getLogger(TranslogHeader.class, "");
186+
// Skipping the check for now.
187+
/*
183188
if (uuid.bytesEquals(expectedUUID) == false) {
184189
throw new TranslogCorruptedException(
185190
path.toString(),
186191
"expected shard UUID " + expectedUUID + " but got: " + uuid + " this translog file belongs to a different translog"
187192
);
188193
}
189-
194+
*/
190195
return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
191196
} catch (EOFException e) {
192197
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);

server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
246246
indexShard.prepareForIndexRecovery();
247247
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
248248
if (hasRemoteSegmentStore) {
249-
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, false);
249+
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true);
250250
}
251251
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
252252
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();

0 commit comments

Comments
 (0)