Skip to content

Commit 69bf700

Browse files
shailendra0811kaushalmahi12
authored andcommitted
[Remote Routing Table] Implement write and read flow for shard diff file. (opensearch-project#14684)
* Implement write and read flow to upload/download shard diff file. Signed-off-by: Shailendra Singh <singhlhs@amazon.com> Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
1 parent 57f1cbc commit 69bf700

20 files changed

+1663
-76
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2121
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
2222
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
2323
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
24+
- Add shard-diff path to diff manifest to reduce number of read calls remote store (([#14684](https://github.com/opensearch-project/OpenSearch/pull/14684)))
2425
- Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785)))
2526
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
2627
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.gateway.remote;
1010

11+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
1112
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
1213
import org.opensearch.cluster.metadata.IndexMetadata;
1314
import org.opensearch.cluster.routing.IndexRoutingTable;
@@ -32,16 +33,19 @@
3233
import java.util.Optional;
3334
import java.util.Set;
3435
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.TimeUnit;
3537
import java.util.concurrent.atomic.AtomicInteger;
3638

3739
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
3840
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
3941
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
42+
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
4043
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
4144

4245
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
4346
public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase {
4447
private static final String INDEX_NAME = "test-index";
48+
private static final String INDEX_NAME_1 = "test-index-1";
4549
BlobPath indexRoutingPath;
4650
AtomicInteger indexRoutingFiles = new AtomicInteger();
4751
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;
@@ -72,7 +76,13 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
7276
RemoteClusterStateService.class
7377
);
7478
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
75-
verifyUpdatesInManifestFile(remoteManifestManager);
79+
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
80+
getClusterState().getClusterName().value(),
81+
getClusterState().getMetadata().clusterUUID()
82+
);
83+
List<String> expectedIndexNames = new ArrayList<>();
84+
List<String> deletedIndexNames = new ArrayList<>();
85+
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
7686

7787
List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
7888
assertTrue(areRoutingTablesSame(routingTableVersions));
@@ -86,7 +96,11 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
8696
assertTrue(indexRoutingFilesAfterUpdate >= indexRoutingFiles.get() + 3);
8797
});
8898

89-
verifyUpdatesInManifestFile(remoteManifestManager);
99+
latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
100+
getClusterState().getClusterName().value(),
101+
getClusterState().getMetadata().clusterUUID()
102+
);
103+
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
90104

91105
routingTableVersions = getRoutingTableFromAllNodes();
92106
assertTrue(areRoutingTablesSame(routingTableVersions));
@@ -98,6 +112,42 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
98112
assertTrue(areRoutingTablesSame(routingTableVersions));
99113
}
100114

115+
public void testRemoteRoutingTableEmptyRoutingTableDiff() throws Exception {
116+
prepareClusterAndVerifyRepository();
117+
118+
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
119+
RemoteClusterStateService.class
120+
);
121+
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
122+
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
123+
getClusterState().getClusterName().value(),
124+
getClusterState().getMetadata().clusterUUID()
125+
);
126+
List<String> expectedIndexNames = new ArrayList<>();
127+
List<String> deletedIndexNames = new ArrayList<>();
128+
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
129+
130+
List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
131+
assertTrue(areRoutingTablesSame(routingTableVersions));
132+
133+
// Update cluster settings
134+
ClusterUpdateSettingsResponse response = client().admin()
135+
.cluster()
136+
.prepareUpdateSettings()
137+
.setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), 0, TimeUnit.SECONDS))
138+
.get();
139+
assertTrue(response.isAcknowledged());
140+
141+
latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
142+
getClusterState().getClusterName().value(),
143+
getClusterState().getMetadata().clusterUUID()
144+
);
145+
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, false);
146+
147+
routingTableVersions = getRoutingTableFromAllNodes();
148+
assertTrue(areRoutingTablesSame(routingTableVersions));
149+
}
150+
101151
public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
102152
BlobStoreRepository repository = prepareClusterAndVerifyRepository();
103153

@@ -124,10 +174,16 @@ public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
124174
RemoteClusterStateService.class
125175
);
126176
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
127-
verifyUpdatesInManifestFile(remoteManifestManager);
177+
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
178+
getClusterState().getClusterName().value(),
179+
getClusterState().getMetadata().clusterUUID()
180+
);
181+
List<String> expectedIndexNames = new ArrayList<>();
182+
List<String> deletedIndexNames = new ArrayList<>();
183+
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
128184
}
129185

130-
public void testRemoteRoutingTableIndexMasterRestart1() throws Exception {
186+
public void testRemoteRoutingTableIndexMasterRestart() throws Exception {
131187
BlobStoreRepository repository = prepareClusterAndVerifyRepository();
132188

133189
List<RoutingTable> routingTableVersions = getRoutingTableFromAllNodes();
@@ -153,7 +209,13 @@ public void testRemoteRoutingTableIndexMasterRestart1() throws Exception {
153209
RemoteClusterStateService.class
154210
);
155211
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
156-
verifyUpdatesInManifestFile(remoteManifestManager);
212+
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
213+
getClusterState().getClusterName().value(),
214+
getClusterState().getMetadata().clusterUUID()
215+
);
216+
List<String> expectedIndexNames = new ArrayList<>();
217+
List<String> deletedIndexNames = new ArrayList<>();
218+
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);
157219
}
158220

159221
private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception {
@@ -208,18 +270,23 @@ private BlobPath getIndexRoutingPath(BlobPath indexRoutingPath, String indexUUID
208270
);
209271
}
210272

211-
private void verifyUpdatesInManifestFile(RemoteManifestManager remoteManifestManager) {
212-
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
213-
getClusterState().getClusterName().value(),
214-
getClusterState().getMetadata().clusterUUID()
215-
);
273+
private void verifyUpdatesInManifestFile(
274+
Optional<ClusterMetadataManifest> latestManifest,
275+
List<String> expectedIndexNames,
276+
int expectedIndicesRoutingFilesInManifest,
277+
List<String> expectedDeletedIndex,
278+
boolean isRoutingTableDiffFileExpected
279+
) {
216280
assertTrue(latestManifest.isPresent());
217281
ClusterMetadataManifest manifest = latestManifest.get();
218-
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME));
219-
assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty());
220-
assertFalse(manifest.getIndicesRouting().isEmpty());
221-
assertEquals(1, manifest.getIndicesRouting().size());
222-
assertTrue(manifest.getIndicesRouting().get(0).getUploadedFilename().contains(indexRoutingPath.buildAsString()));
282+
283+
assertEquals(expectedIndexNames, manifest.getDiffManifest().getIndicesRoutingUpdated());
284+
assertEquals(expectedDeletedIndex, manifest.getDiffManifest().getIndicesDeleted());
285+
assertEquals(expectedIndicesRoutingFilesInManifest, manifest.getIndicesRouting().size());
286+
for (ClusterMetadataManifest.UploadedIndexMetadata uploadedFilename : manifest.getIndicesRouting()) {
287+
assertTrue(uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString()));
288+
}
289+
assertEquals(isRoutingTableDiffFileExpected, manifest.getDiffManifest().getIndicesRoutingDiffPath() != null);
223290
}
224291

225292
private List<RoutingTable> getRoutingTableFromAllNodes() throws ExecutionException, InterruptedException {
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing;
10+
11+
import org.opensearch.cluster.Diff;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.core.common.io.stream.StreamOutput;
14+
15+
import java.io.IOException;
16+
import java.util.ArrayList;
17+
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
/**
22+
* Represents a difference between {@link RoutingTable} objects that can be serialized and deserialized.
23+
*/
24+
public class RoutingTableIncrementalDiff implements Diff<RoutingTable> {
25+
26+
private final Map<String, Diff<IndexRoutingTable>> diffs;
27+
28+
/**
29+
* Constructs a new RoutingTableIncrementalDiff with the given differences.
30+
*
31+
* @param diffs a map containing the differences of {@link IndexRoutingTable}.
32+
*/
33+
public RoutingTableIncrementalDiff(Map<String, Diff<IndexRoutingTable>> diffs) {
34+
this.diffs = diffs;
35+
}
36+
37+
/**
38+
* Gets the map of differences of {@link IndexRoutingTable}.
39+
*
40+
* @return a map containing the differences.
41+
*/
42+
public Map<String, Diff<IndexRoutingTable>> getDiffs() {
43+
return diffs;
44+
}
45+
46+
/**
47+
* Reads a {@link RoutingTableIncrementalDiff} from the given {@link StreamInput}.
48+
*
49+
* @param in the input stream to read from.
50+
* @return the deserialized RoutingTableIncrementalDiff.
51+
* @throws IOException if an I/O exception occurs while reading from the stream.
52+
*/
53+
public static RoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException {
54+
int size = in.readVInt();
55+
Map<String, Diff<IndexRoutingTable>> diffs = new HashMap<>();
56+
57+
for (int i = 0; i < size; i++) {
58+
String key = in.readString();
59+
Diff<IndexRoutingTable> diff = IndexRoutingTableIncrementalDiff.readFrom(in);
60+
diffs.put(key, diff);
61+
}
62+
return new RoutingTableIncrementalDiff(diffs);
63+
}
64+
65+
/**
66+
* Applies the differences to the provided {@link RoutingTable}.
67+
*
68+
* @param part the original RoutingTable to which the differences will be applied.
69+
* @return the updated RoutingTable with the applied differences.
70+
*/
71+
@Override
72+
public RoutingTable apply(RoutingTable part) {
73+
RoutingTable.Builder builder = new RoutingTable.Builder();
74+
for (IndexRoutingTable indexRoutingTable : part) {
75+
builder.add(indexRoutingTable); // Add existing index routing tables to builder
76+
}
77+
78+
// Apply the diffs
79+
for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) {
80+
builder.add(entry.getValue().apply(part.index(entry.getKey())));
81+
}
82+
83+
return builder.build();
84+
}
85+
86+
/**
87+
* Writes the differences to the given {@link StreamOutput}.
88+
*
89+
* @param out the output stream to write to.
90+
* @throws IOException if an I/O exception occurs while writing to the stream.
91+
*/
92+
@Override
93+
public void writeTo(StreamOutput out) throws IOException {
94+
out.writeVInt(diffs.size());
95+
for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) {
96+
out.writeString(entry.getKey());
97+
entry.getValue().writeTo(out);
98+
}
99+
}
100+
101+
/**
102+
* Represents a difference between {@link IndexShardRoutingTable} objects that can be serialized and deserialized.
103+
*/
104+
public static class IndexRoutingTableIncrementalDiff implements Diff<IndexRoutingTable> {
105+
106+
private final List<IndexShardRoutingTable> indexShardRoutingTables;
107+
108+
/**
109+
* Constructs a new IndexShardRoutingTableDiff with the given shard routing tables.
110+
*
111+
* @param indexShardRoutingTables a list of IndexShardRoutingTable representing the differences.
112+
*/
113+
public IndexRoutingTableIncrementalDiff(List<IndexShardRoutingTable> indexShardRoutingTables) {
114+
this.indexShardRoutingTables = indexShardRoutingTables;
115+
}
116+
117+
/**
118+
* Applies the differences to the provided {@link IndexRoutingTable}.
119+
*
120+
* @param part the original IndexRoutingTable to which the differences will be applied.
121+
* @return the updated IndexRoutingTable with the applied differences.
122+
*/
123+
@Override
124+
public IndexRoutingTable apply(IndexRoutingTable part) {
125+
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex());
126+
for (IndexShardRoutingTable shardRoutingTable : part) {
127+
builder.addIndexShard(shardRoutingTable); // Add existing shards to builder
128+
}
129+
130+
// Apply the diff: update or add the new shard routing tables
131+
for (IndexShardRoutingTable diffShard : indexShardRoutingTables) {
132+
builder.addIndexShard(diffShard);
133+
}
134+
return builder.build();
135+
}
136+
137+
/**
138+
* Writes the differences to the given {@link StreamOutput}.
139+
*
140+
* @param out the output stream to write to.
141+
* @throws IOException if an I/O exception occurs while writing to the stream.
142+
*/
143+
@Override
144+
public void writeTo(StreamOutput out) throws IOException {
145+
out.writeVInt(indexShardRoutingTables.size());
146+
for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) {
147+
IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out);
148+
}
149+
}
150+
151+
/**
152+
* Reads a {@link IndexRoutingTableIncrementalDiff} from the given {@link StreamInput}.
153+
*
154+
* @param in the input stream to read from.
155+
* @return the deserialized IndexShardRoutingTableDiff.
156+
* @throws IOException if an I/O exception occurs while reading from the stream.
157+
*/
158+
public static IndexRoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException {
159+
int size = in.readVInt();
160+
List<IndexShardRoutingTable> indexShardRoutingTables = new ArrayList<>(size);
161+
for (int i = 0; i < size; i++) {
162+
IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
163+
indexShardRoutingTables.add(shardRoutingTable);
164+
}
165+
return new IndexRoutingTableIncrementalDiff(indexShardRoutingTables);
166+
}
167+
}
168+
}

0 commit comments

Comments
 (0)