Skip to content

Commit 9614561

Browse files
Use remote publication flag to decide which custom objects to upload (#14338) (#14390)
* Simplify updated customs (ClusterState.Custom & Metadata.Custom) persistence logic to remote store (cherry picked from commit a3402d1) Signed-off-by: Sooraj Sinha <soosinha@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 00d5b36 commit 9614561

File tree

7 files changed

+591
-98
lines changed

7 files changed

+591
-98
lines changed

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
import org.opensearch.action.LatchedActionListener;
1212
import org.opensearch.cluster.ClusterState;
13+
import org.opensearch.cluster.DiffableUtils;
14+
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
1315
import org.opensearch.common.CheckedRunnable;
1416
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
1517
import org.opensearch.common.remote.RemoteWritableEntityStore;
@@ -25,10 +27,9 @@
2527
import org.opensearch.threadpool.ThreadPool;
2628

2729
import java.io.IOException;
30+
import java.util.Collections;
2831
import java.util.HashMap;
29-
import java.util.HashSet;
3032
import java.util.Map;
31-
import java.util.Set;
3233

3334
/**
3435
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
@@ -126,18 +127,35 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
126127
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
127128
}
128129

129-
public Map<String, ClusterState.Custom> getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) {
130-
Map<String, ClusterState.Custom> updatedCustoms = new HashMap<>();
131-
Set<String> currentCustoms = new HashSet<>(clusterState.customs().keySet());
132-
for (Map.Entry<String, ClusterState.Custom> entry : previousClusterState.customs().entrySet()) {
133-
if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) {
134-
updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey()));
135-
}
136-
currentCustoms.remove(entry.getKey());
130+
public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
131+
ClusterState clusterState,
132+
ClusterState previousClusterState,
133+
boolean isRemotePublicationEnabled,
134+
boolean isFirstUpload
135+
) {
136+
if (!isRemotePublicationEnabled) {
137+
// When isRemotePublicationEnabled is false, we do not want store any custom objects
138+
return DiffableUtils.diff(
139+
Collections.emptyMap(),
140+
Collections.emptyMap(),
141+
DiffableUtils.getStringKeySerializer(),
142+
NonDiffableValueSerializer.getAbstractInstance()
143+
);
137144
}
138-
for (String custom : currentCustoms) {
139-
updatedCustoms.put(custom, clusterState.customs().get(custom));
145+
if (isFirstUpload) {
146+
// For first upload of ephemeral metadata, we want to upload all customs
147+
return DiffableUtils.diff(
148+
Collections.emptyMap(),
149+
clusterState.customs(),
150+
DiffableUtils.getStringKeySerializer(),
151+
NonDiffableValueSerializer.getAbstractInstance()
152+
);
140153
}
141-
return updatedCustoms;
154+
return DiffableUtils.diff(
155+
previousClusterState.customs(),
156+
clusterState.customs(),
157+
DiffableUtils.getStringKeySerializer(),
158+
NonDiffableValueSerializer.getAbstractInstance()
159+
);
142160
}
143161
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 44 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.common.settings.Setting.Property;
4040
import org.opensearch.common.settings.Settings;
4141
import org.opensearch.common.unit.TimeValue;
42+
import org.opensearch.common.util.FeatureFlags;
4243
import org.opensearch.common.util.io.IOUtils;
4344
import org.opensearch.core.action.ActionListener;
4445
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
@@ -88,6 +89,7 @@
8889

8990
import static java.util.Collections.emptyList;
9091
import static java.util.Collections.emptyMap;
92+
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
9193
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
9294
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
9395
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS;
@@ -159,6 +161,7 @@ public class RemoteClusterStateService implements Closeable {
159161
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
160162
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
161163
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
164+
private final boolean isPublicationEnabled;
162165

163166
// ToXContent Params with gateway mode.
164167
// We are using gateway context mode to persist all custom metadata.
@@ -201,6 +204,9 @@ public RemoteClusterStateService(
201204
threadPool
202205
);
203206
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
207+
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
208+
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
209+
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
204210
}
205211

206212
/**
@@ -221,15 +227,15 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
221227
clusterState,
222228
new ArrayList<>(clusterState.metadata().indices().values()),
223229
emptyMap(),
224-
clusterState.metadata().customs(),
230+
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
225231
true,
226232
true,
227233
true,
228-
true,
229-
true,
230-
true,
231-
clusterState.customs(),
232-
true,
234+
isPublicationEnabled,
235+
isPublicationEnabled,
236+
isPublicationEnabled,
237+
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
238+
isPublicationEnabled,
233239
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable())
234240
);
235241
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
@@ -285,28 +291,17 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
285291
}
286292
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();
287293

288-
final Map<String, UploadedMetadataAttribute> customsToBeDeletedFromRemote = new HashMap<>(previousManifest.getCustomMetadataMap());
289-
final Map<String, Metadata.Custom> customsToUpload = remoteGlobalMetadataManager.getUpdatedCustoms(
290-
clusterState,
291-
previousClusterState
292-
);
293-
final Map<String, UploadedMetadataAttribute> clusterStateCustomsToBeDeleted = new HashMap<>(
294+
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();
295+
296+
final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
297+
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled);
298+
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
299+
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false);
300+
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
301+
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
294302
previousManifest.getClusterStateCustomMap()
295303
);
296-
final Map<String, ClusterState.Custom> clusterStateCustomsToUpload = remoteClusterStateAttributesManager.getUpdatedCustoms(
297-
clusterState,
298-
previousClusterState
299-
);
300-
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
301-
for (final String custom : clusterState.metadata().customs().keySet()) {
302-
// remove all the customs which are present currently
303-
customsToBeDeletedFromRemote.remove(custom);
304-
}
305304
final Map<String, IndexMetadata> indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices());
306-
for (final String custom : clusterState.customs().keySet()) {
307-
// remove all the custom which are present currently
308-
clusterStateCustomsToBeDeleted.remove(custom);
309-
}
310305
int numIndicesUpdated = 0;
311306
int numIndicesUnchanged = 0;
312307
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = previousManifest.getIndices()
@@ -337,42 +332,44 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
337332
indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName());
338333
}
339334

340-
DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = remoteRoutingTableService
335+
final DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = remoteRoutingTableService
341336
.getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable());
342-
List<IndexRoutingTable> indicesRoutingToUpload = new ArrayList<>();
337+
final List<IndexRoutingTable> indicesRoutingToUpload = new ArrayList<>();
343338
routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v));
344339

345340
UploadedMetadataResults uploadedMetadataResults;
346341
// For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files,
347342
// If file is empty and codec is 1 then write global metadata.
348-
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();
349343
boolean updateCoordinationMetadata = firstUploadForSplitGlobalMetadata
350344
|| Metadata.isCoordinationMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
351345
;
352346
boolean updateSettingsMetadata = firstUploadForSplitGlobalMetadata
353347
|| Metadata.isSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
354-
boolean updateTransientSettingsMetadata = firstUploadForSplitGlobalMetadata
355-
|| Metadata.isTransientSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
348+
boolean updateTransientSettingsMetadata = Metadata.isTransientSettingsMetadataEqual(
349+
previousClusterState.metadata(),
350+
clusterState.metadata()
351+
) == false;
356352
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
357353
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
358-
// ToDo: check if these needs to be updated or not
359-
final boolean updateDiscoveryNodes = clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
360-
final boolean updateClusterBlocks = !clusterState.blocks().equals(previousClusterState.blocks());
361-
final boolean updateHashesOfConsistentSettings = firstUploadForSplitGlobalMetadata
354+
355+
final boolean updateDiscoveryNodes = isPublicationEnabled
356+
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
357+
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
358+
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
362359
|| Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
363360

364361
uploadedMetadataResults = writeMetadataInParallel(
365362
clusterState,
366363
toUpload,
367364
prevIndexMetadataByName,
368-
firstUploadForSplitGlobalMetadata ? clusterState.metadata().customs() : customsToUpload,
365+
customsDiff.getUpserts(),
369366
updateCoordinationMetadata,
370367
updateSettingsMetadata,
371368
updateTemplatesMetadata,
372369
updateDiscoveryNodes,
373370
updateClusterBlocks,
374371
updateTransientSettingsMetadata,
375-
clusterStateCustomsToUpload,
372+
clusterStateCustomsDiff.getUpserts(),
376373
updateHashesOfConsistentSettings,
377374
indicesRoutingToUpload
378375
);
@@ -382,10 +379,11 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
382379
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
383380
);
384381
allUploadedCustomMap.putAll(uploadedMetadataResults.uploadedCustomMetadataMap);
382+
allUploadedClusterStateCustomsMap.putAll(uploadedMetadataResults.uploadedClusterStateCustomMetadataMap);
385383
// remove the data for removed custom/indices
386-
customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove);
384+
customsDiff.getDeletes().forEach(allUploadedCustomMap::remove);
387385
indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove);
388-
clusterStateCustomsToBeDeleted.keySet().forEach(allUploadedCustomMap::remove);
386+
clusterStateCustomsDiff.getDeletes().forEach(allUploadedClusterStateCustomsMap::remove);
389387

390388
if (!updateCoordinationMetadata) {
391389
uploadedMetadataResults.uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata();
@@ -399,31 +397,24 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
399397
if (!updateTemplatesMetadata) {
400398
uploadedMetadataResults.uploadedTemplatesMetadata = previousManifest.getTemplatesMetadata();
401399
}
402-
if (!updateDiscoveryNodes && !firstUploadForSplitGlobalMetadata) {
400+
if (!updateDiscoveryNodes) {
403401
uploadedMetadataResults.uploadedDiscoveryNodes = previousManifest.getDiscoveryNodesMetadata();
404402
}
405-
if (!updateClusterBlocks && !firstUploadForSplitGlobalMetadata) {
403+
if (!updateClusterBlocks) {
406404
uploadedMetadataResults.uploadedClusterBlocks = previousManifest.getClusterBlocksMetadata();
407405
}
408-
if (!updateHashesOfConsistentSettings && !firstUploadForSplitGlobalMetadata) {
406+
if (!updateHashesOfConsistentSettings) {
409407
uploadedMetadataResults.uploadedHashesOfConsistentSettings = previousManifest.getHashesOfConsistentSettings();
410408
}
411-
if (!firstUploadForSplitGlobalMetadata && customsToUpload.isEmpty()) {
412-
uploadedMetadataResults.uploadedCustomMetadataMap = previousManifest.getCustomMetadataMap();
413-
}
414-
if (!firstUploadForSplitGlobalMetadata && clusterStateCustomsToUpload.isEmpty()) {
415-
uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = previousManifest.getClusterStateCustomMap();
416-
}
417409
uploadedMetadataResults.uploadedCustomMetadataMap = allUploadedCustomMap;
410+
uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = allUploadedClusterStateCustomsMap;
418411
uploadedMetadataResults.uploadedIndexMetadata = new ArrayList<>(allUploadedIndexMetadata.values());
419412

420-
List<ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = new ArrayList<>();
421-
allUploadedIndicesRouting = remoteRoutingTableService.getAllUploadedIndicesRouting(
413+
uploadedMetadataResults.uploadedIndicesRoutingMetadata = remoteRoutingTableService.getAllUploadedIndicesRouting(
422414
previousManifest,
423415
uploadedMetadataResults.uploadedIndicesRoutingMetadata,
424416
routingTableDiff.getDeletes()
425417
);
426-
uploadedMetadataResults.uploadedIndicesRoutingMetadata = allUploadedIndicesRouting;
427418

428419
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
429420
clusterState,
@@ -448,7 +439,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
448439
updateCoordinationMetadata,
449440
updateSettingsMetadata,
450441
updateTemplatesMetadata,
451-
customsToUpload.size(),
442+
customsDiff.getUpserts().size(),
452443
indicesRoutingToUpload.size()
453444
);
454445
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
@@ -464,7 +455,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
464455
updateCoordinationMetadata,
465456
updateSettingsMetadata,
466457
updateTemplatesMetadata,
467-
customsToUpload.size()
458+
customsDiff.getUpserts().size()
468459
);
469460
} else {
470461
logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage);
@@ -479,7 +470,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
479470
updateCoordinationMetadata,
480471
updateSettingsMetadata,
481472
updateTemplatesMetadata,
482-
customsToUpload.size()
473+
customsDiff.getUpserts().size()
483474
);
484475
}
485476
return manifestDetails;

0 commit comments

Comments
 (0)