Skip to content

Commit c429861

Browse files
authored
[Remote State] Upload incremental cluster state on master re-election (#15145) (#15853)
* Upload incremental cluster state on master re-election Signed-off-by: Shivansh Arora <hishiv@amazon.com> (cherry picked from commit cbdcbb7)
1 parent 725ed36 commit c429861

File tree

12 files changed

+677
-126
lines changed

12 files changed

+677
-126
lines changed

release-notes/opensearch.release-notes-2.17.0.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
5353
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
5454
- Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478))
55+
- [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145))
5556

5657
### Dependencies
5758
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {
6262
private static final String REMOTE_STATE_PREFIX = "!";
6363
private static final String REMOTE_ROUTING_PREFIX = "_";
6464
private boolean isRemoteStateEnabled = true;
65-
private String isRemotePublicationEnabled = "true";
65+
private boolean isRemotePublicationEnabled = true;
6666
private boolean hasRemoteStateCharPrefix;
6767
private boolean hasRemoteRoutingCharPrefix;
6868

6969
@Before
7070
public void setup() {
7171
asyncUploadMockFsRepo = false;
7272
isRemoteStateEnabled = true;
73-
isRemotePublicationEnabled = "true";
73+
isRemotePublicationEnabled = true;
7474
hasRemoteStateCharPrefix = randomBoolean();
7575
hasRemoteRoutingCharPrefix = randomBoolean();
7676
}
@@ -100,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
100100
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
101101
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
102102
)
103+
.put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled)
103104
.put(
104105
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
105106
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""

server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -76,29 +76,4 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
7676
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
7777
verifyRestoredData(indexStats, indexName, true);
7878
}
79-
80-
public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
81-
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
82-
}
83-
84-
public void prepareCluster(
85-
int numClusterManagerNodes,
86-
int numDataOnlyNodes,
87-
String indices,
88-
int replicaCount,
89-
int shardCount,
90-
Settings settings
91-
) {
92-
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
93-
for (String index : indices.split(",")) {
94-
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
95-
ensureYellowAndNoInitializingShards(index);
96-
ensureGreen(index);
97-
}
98-
}
99-
100-
public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
101-
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
102-
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
103-
}
10479
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -351,13 +351,7 @@ protected void restore(boolean restoreAllShards, String... indices) {
351351
}
352352

353353
protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
354-
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
355-
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
356-
for (String index : indices.split(",")) {
357-
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
358-
ensureYellowAndNoInitializingShards(index);
359-
ensureGreen(index);
360-
}
354+
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
361355
}
362356

363357
protected void prepareCluster(
@@ -368,11 +362,16 @@ protected void prepareCluster(
368362
int shardCount,
369363
Settings settings
370364
) {
371-
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
372-
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
365+
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
373366
for (String index : indices.split(",")) {
374367
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
368+
ensureYellowAndNoInitializingShards(index);
375369
ensureGreen(index);
376370
}
377371
}
372+
373+
protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
374+
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
375+
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
376+
}
378377
}

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.cluster.node.DiscoveryNode;
4141
import org.opensearch.common.settings.Settings;
4242
import org.opensearch.common.util.io.IOUtils;
43+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
4344

4445
import java.io.Closeable;
4546
import java.io.IOException;
@@ -104,6 +105,7 @@ public CoordinationState(
104105
.getLastAcceptedConfiguration();
105106
this.publishVotes = new VoteCollection();
106107
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
108+
// ToDo: revisit this check while making the setting dynamic
107109
this.isRemotePublicationEnabled = isRemoteStateEnabled
108110
&& REMOTE_PUBLICATION_SETTING.get(settings)
109111
&& localNode.isRemoteStatePublicationEnabled();
@@ -459,6 +461,9 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
459461
clusterState.term()
460462
);
461463
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
464+
if (shouldUpdateRemotePersistedState(publishRequest)) {
465+
updateRemotePersistedStateOnPublishRequest(publishRequest);
466+
}
462467
assert getLastAcceptedState() == clusterState;
463468

464469
return new PublishResponse(clusterState.term(), clusterState.version());
@@ -571,6 +576,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
571576
);
572577

573578
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
579+
if (shouldCommitRemotePersistedState()) {
580+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
581+
}
574582
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
575583
}
576584

@@ -616,6 +624,33 @@ public void close() throws IOException {
616624
IOUtils.close(persistedStateRegistry);
617625
}
618626

627+
private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) {
628+
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
629+
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false;
630+
}
631+
632+
private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) {
633+
if (publishRequest instanceof RemoteStatePublishRequest) {
634+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState());
635+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)
636+
.setLastAcceptedManifest(((RemoteStatePublishRequest) publishRequest).getAcceptedManifest());
637+
} else {
638+
// We will end up here if PublishRequest was sent not using Remote Store even with remote persisted state on this node
639+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null);
640+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null);
641+
}
642+
}
643+
644+
private boolean shouldCommitRemotePersistedState() {
645+
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
646+
&& persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
647+
.getLastAcceptedState()
648+
.getNodes()
649+
.isLocalNodeElectedClusterManager() == false
650+
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null
651+
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
652+
}
653+
619654
/**
620655
* Pluggable persistence layer for {@link CoordinationState}.
621656
*
@@ -653,6 +688,22 @@ public interface PersistedState extends Closeable {
653688
*/
654689
PersistedStateStats getStats();
655690

691+
/**
692+
* Returns the last accepted {@link ClusterMetadataManifest}.
693+
*
694+
* @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest
695+
* has been accepted yet.
696+
*/
697+
default ClusterMetadataManifest getLastAcceptedManifest() {
698+
// return null by default, this method needs to be overridden wherever required
699+
return null;
700+
}
701+
702+
/**
703+
* Sets the last accepted {@link ClusterMetadataManifest}.
704+
*/
705+
default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {}
706+
656707
/**
657708
* Marks the last accepted cluster state as committed.
658709
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
@@ -661,14 +712,7 @@ public interface PersistedState extends Closeable {
661712
*/
662713
default void markLastAcceptedStateAsCommitted() {
663714
final ClusterState lastAcceptedState = getLastAcceptedState();
664-
Metadata.Builder metadataBuilder = null;
665-
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
666-
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
667-
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
668-
.build();
669-
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
670-
metadataBuilder.coordinationMetadata(coordinationMetadata);
671-
}
715+
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
672716
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
673717
// the cluster uuid might not been known yet.
674718
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
@@ -693,6 +737,18 @@ default void markLastAcceptedStateAsCommitted() {
693737
}
694738
}
695739

740+
default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
741+
Metadata.Builder metadataBuilder = null;
742+
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
743+
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
744+
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
745+
.build();
746+
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
747+
metadataBuilder.coordinationMetadata(coordinationMetadata);
748+
}
749+
return metadataBuilder;
750+
}
751+
696752
default void close() throws IOException {}
697753
}
698754

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
199199
}
200200
fullClusterStateReceivedCount.incrementAndGet();
201201
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
202-
final PublishWithJoinResponse response = acceptState(incomingState);
202+
final PublishWithJoinResponse response = acceptState(incomingState, null);
203203
lastSeenClusterState.set(incomingState);
204204
return response;
205205
} else {
@@ -230,7 +230,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
230230
incomingState.stateUUID(),
231231
request.bytes().length()
232232
);
233-
final PublishWithJoinResponse response = acceptState(incomingState);
233+
final PublishWithJoinResponse response = acceptState(incomingState, null);
234234
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
235235
return response;
236236
}
@@ -281,7 +281,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
281281
true
282282
);
283283
fullClusterStateReceivedCount.incrementAndGet();
284-
final PublishWithJoinResponse response = acceptState(clusterState);
284+
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
285285
lastSeenClusterState.set(clusterState);
286286
return response;
287287
} else {
@@ -300,7 +300,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
300300
transportService.getLocalNode().getId()
301301
);
302302
compatibleClusterStateDiffReceivedCount.incrementAndGet();
303-
final PublishWithJoinResponse response = acceptState(clusterState);
303+
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
304304
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
305305
return response;
306306
}
@@ -314,7 +314,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
314314
}
315315
}
316316

317-
private PublishWithJoinResponse acceptState(ClusterState incomingState) {
317+
private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) {
318318
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
319319
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
320320
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
@@ -324,6 +324,9 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
324324
return handlePublishRequest.apply(publishRequest);
325325
}
326326
}
327+
if (manifest != null) {
328+
return handlePublishRequest.apply(new RemoteStatePublishRequest(incomingState, manifest));
329+
}
327330
return handlePublishRequest.apply(new PublishRequest(incomingState));
328331
}
329332

@@ -539,7 +542,7 @@ public String executor() {
539542
}
540543

541544
public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
542-
logger.info("sending cluster state over transport to node: {}", destination.getName());
545+
logger.debug("sending cluster state over transport to node: {}", destination.getName());
543546
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
544547
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
545548
sendFullClusterState(destination, listener);
@@ -639,7 +642,7 @@ public class RemotePublicationContext extends PublicationContext {
639642
@Override
640643
public void sendClusterState(final DiscoveryNode destination, final ActionListener<PublishWithJoinResponse> listener) {
641644
try {
642-
logger.info("sending remote cluster state to node: {}", destination.getName());
645+
logger.debug("sending remote cluster state to node: {}", destination.getName());
643646
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
644647
.getLastUploadedManifestFile();
645648
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.coordination;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
13+
14+
import java.util.Objects;
15+
16+
/**
17+
* PublishRequest created by downloading the accepted {@link ClusterState} from Remote Store, using the published {@link ClusterMetadataManifest}
18+
*
19+
* @opensearch.internal
20+
*/
21+
public class RemoteStatePublishRequest extends PublishRequest {
22+
private final ClusterMetadataManifest manifest;
23+
24+
public RemoteStatePublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) {
25+
super(acceptedState);
26+
this.manifest = acceptedManifest;
27+
}
28+
29+
public ClusterMetadataManifest getAcceptedManifest() {
30+
return manifest;
31+
}
32+
33+
@Override
34+
public boolean equals(Object o) {
35+
if (this == o) return true;
36+
if (o == null || getClass() != o.getClass()) return false;
37+
if (!super.equals(o)) return false;
38+
RemoteStatePublishRequest that = (RemoteStatePublishRequest) o;
39+
return Objects.equals(manifest, that.manifest);
40+
}
41+
42+
@Override
43+
public int hashCode() {
44+
return Objects.hash(super.hashCode(), manifest);
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return "RemoteStatePublishRequest{" + super.toString() + "manifest=" + manifest + "} ";
50+
}
51+
}

0 commit comments

Comments
 (0)