Skip to content

Commit a37d8da

Browse files
himshikhaBukhtawarHimshikha Guptaalchemist51
authored and
wangdongyu.danny
committed
[Remote Routing Table] Initial commit for index routing table manifest (opensearch-project#13577)
* Initial commit for index routing table manifest Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com> Co-authored-by: Himshikha Gupta <himshikh@amazon.com> Co-authored-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent c66be2c commit a37d8da

17 files changed

+543
-24
lines changed

server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java renamed to libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamInput.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,10 @@
3030
* GitHub history for details.
3131
*/
3232

33-
package org.opensearch.index.translog;
33+
package org.opensearch.core.common.io.stream;
3434

3535
import org.apache.lucene.store.BufferedChecksum;
3636
import org.apache.lucene.util.BitUtil;
37-
import org.opensearch.core.common.io.stream.FilterStreamInput;
38-
import org.opensearch.core.common.io.stream.StreamInput;
3937

4038
import java.io.EOFException;
4139
import java.io.IOException;

server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamOutput.java renamed to libs/core/src/main/java/org/opensearch/core/common/io/stream/BufferedChecksumStreamOutput.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,10 @@
3030
* GitHub history for details.
3131
*/
3232

33-
package org.opensearch.index.translog;
33+
package org.opensearch.core.common.io.stream;
3434

3535
import org.apache.lucene.store.BufferedChecksum;
3636
import org.opensearch.common.annotation.PublicApi;
37-
import org.opensearch.core.common.io.stream.StreamOutput;
3837

3938
import java.io.IOException;
4039
import java.util.zip.CRC32;

server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java

Lines changed: 103 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
4141
public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest.
4242
public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file.
4343
public static final int CODEC_V2 = 2; // In Codec V2, there are seperate metadata files rather than a single global metadata file.
44+
public static final int CODEC_V3 = 3; // In Codec V3, we introduce index routing-metadata in manifest file.
4445

4546
private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
4647
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
@@ -58,6 +59,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
5859
private static final ParseField UPLOADED_SETTINGS_METADATA = new ParseField("uploaded_settings_metadata");
5960
private static final ParseField UPLOADED_TEMPLATES_METADATA = new ParseField("uploaded_templates_metadata");
6061
private static final ParseField UPLOADED_CUSTOM_METADATA = new ParseField("uploaded_custom_metadata");
62+
private static final ParseField ROUTING_TABLE_VERSION_FIELD = new ParseField("routing_table_version");
63+
private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing");
6164

6265
private static ClusterMetadataManifest.Builder manifestV0Builder(Object[] fields) {
6366
return ClusterMetadataManifest.builder()
@@ -86,6 +89,12 @@ private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields
8689
.customMetadataMap(customMetadata(fields));
8790
}
8891

92+
private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields) {
93+
return manifestV2Builder(fields).codecVersion(codecVersion(fields))
94+
.routingTableVersion(routingTableVersion(fields))
95+
.indicesRouting(indicesRouting(fields));
96+
}
97+
8998
private static long term(Object[] fields) {
9099
return (long) fields[0];
91100
}
@@ -151,6 +160,14 @@ private static Map<String, UploadedMetadataAttribute> customMetadata(Object[] fi
151160
return customs.stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity()));
152161
}
153162

163+
private static long routingTableVersion(Object[] fields) {
164+
return (long) fields[15];
165+
}
166+
167+
private static List<UploadedIndexMetadata> indicesRouting(Object[] fields) {
168+
return (List<UploadedIndexMetadata>) fields[16];
169+
}
170+
154171
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V0 = new ConstructingObjectParser<>(
155172
"cluster_metadata_manifest",
156173
fields -> manifestV0Builder(fields).build()
@@ -166,12 +183,18 @@ private static Map<String, UploadedMetadataAttribute> customMetadata(Object[] fi
166183
fields -> manifestV2Builder(fields).build()
167184
);
168185

169-
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V2;
186+
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V3 = new ConstructingObjectParser<>(
187+
"cluster_metadata_manifest",
188+
fields -> manifestV3Builder(fields).build()
189+
);
190+
191+
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V3;
170192

171193
static {
172194
declareParser(PARSER_V0, CODEC_V0);
173195
declareParser(PARSER_V1, CODEC_V1);
174196
declareParser(PARSER_V2, CODEC_V2);
197+
declareParser(PARSER_V3, CODEC_V3);
175198
}
176199

177200
private static void declareParser(ConstructingObjectParser<ClusterMetadataManifest, Void> parser, long codec_version) {
@@ -216,6 +239,14 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
216239
UPLOADED_CUSTOM_METADATA
217240
);
218241
}
242+
if (codec_version >= CODEC_V3) {
243+
parser.declareLong(ConstructingObjectParser.constructorArg(), ROUTING_TABLE_VERSION_FIELD);
244+
parser.declareObjectArray(
245+
ConstructingObjectParser.constructorArg(),
246+
(p, c) -> UploadedIndexMetadata.fromXContent(p),
247+
INDICES_ROUTING_FIELD
248+
);
249+
}
219250
}
220251

221252
private final int codecVersion;
@@ -234,6 +265,8 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
234265
private final boolean committed;
235266
private final String previousClusterUUID;
236267
private final boolean clusterUUIDCommitted;
268+
private final long routingTableVersion;
269+
private final List<UploadedIndexMetadata> indicesRouting;
237270

238271
public List<UploadedIndexMetadata> getIndices() {
239272
return indices;
@@ -306,6 +339,14 @@ public boolean hasMetadataAttributesFiles() {
306339
|| !uploadedCustomMetadataMap.isEmpty();
307340
}
308341

342+
public long getRoutingTableVersion() {
343+
return routingTableVersion;
344+
}
345+
346+
public List<UploadedIndexMetadata> getIndicesRouting() {
347+
return indicesRouting;
348+
}
349+
309350
public ClusterMetadataManifest(
310351
long clusterTerm,
311352
long version,
@@ -322,7 +363,9 @@ public ClusterMetadataManifest(
322363
UploadedMetadataAttribute uploadedCoordinationMetadata,
323364
UploadedMetadataAttribute uploadedSettingsMetadata,
324365
UploadedMetadataAttribute uploadedTemplatesMetadata,
325-
Map<String, UploadedMetadataAttribute> uploadedCustomMetadataMap
366+
Map<String, UploadedMetadataAttribute> uploadedCustomMetadataMap,
367+
long routingTableVersion,
368+
List<UploadedIndexMetadata> indicesRouting
326369
) {
327370
this.clusterTerm = clusterTerm;
328371
this.stateVersion = version;
@@ -336,6 +379,8 @@ public ClusterMetadataManifest(
336379
this.indices = Collections.unmodifiableList(indices);
337380
this.previousClusterUUID = previousClusterUUID;
338381
this.clusterUUIDCommitted = clusterUUIDCommitted;
382+
this.routingTableVersion = routingTableVersion;
383+
this.indicesRouting = Collections.unmodifiableList(indicesRouting);
339384
this.uploadedCoordinationMetadata = uploadedCoordinationMetadata;
340385
this.uploadedSettingsMetadata = uploadedSettingsMetadata;
341386
this.uploadedTemplatesMetadata = uploadedTemplatesMetadata;
@@ -364,20 +409,26 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
364409
in.readMap(StreamInput::readString, UploadedMetadataAttribute::new)
365410
);
366411
this.globalMetadataFileName = null;
412+
this.routingTableVersion = in.readLong();
413+
this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
367414
} else if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
368415
this.codecVersion = in.readInt();
369416
this.globalMetadataFileName = in.readString();
370417
this.uploadedCoordinationMetadata = null;
371418
this.uploadedSettingsMetadata = null;
372419
this.uploadedTemplatesMetadata = null;
373420
this.uploadedCustomMetadataMap = null;
421+
this.routingTableVersion = -1;
422+
this.indicesRouting = null;
374423
} else {
375424
this.codecVersion = CODEC_V0; // Default codec
376425
this.globalMetadataFileName = null;
377426
this.uploadedCoordinationMetadata = null;
378427
this.uploadedSettingsMetadata = null;
379428
this.uploadedTemplatesMetadata = null;
380429
this.uploadedCustomMetadataMap = null;
430+
this.routingTableVersion = -1;
431+
this.indicesRouting = null;
381432
}
382433
}
383434

@@ -401,7 +452,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
401452
builder.startArray(INDICES_FIELD.getPreferredName());
402453
{
403454
for (UploadedIndexMetadata uploadedIndexMetadata : indices) {
455+
builder.startObject();
404456
uploadedIndexMetadata.toXContent(builder, params);
457+
builder.endObject();
405458
}
406459
}
407460
builder.endArray();
@@ -433,6 +486,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
433486
builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion());
434487
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
435488
}
489+
if (onOrAfterCodecVersion(CODEC_V3)) {
490+
builder.field(ROUTING_TABLE_VERSION_FIELD.getPreferredName(), getRoutingTableVersion());
491+
builder.startArray(INDICES_ROUTING_FIELD.getPreferredName());
492+
{
493+
for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) {
494+
builder.startObject();
495+
uploadedIndexMetadata.toXContent(builder, params);
496+
builder.endObject();
497+
}
498+
}
499+
builder.endArray();
500+
}
436501
return builder;
437502
}
438503

@@ -454,6 +519,8 @@ public void writeTo(StreamOutput out) throws IOException {
454519
uploadedSettingsMetadata.writeTo(out);
455520
uploadedTemplatesMetadata.writeTo(out);
456521
out.writeMap(uploadedCustomMetadataMap, StreamOutput::writeString, (o, v) -> v.writeTo(o));
522+
out.writeLong(routingTableVersion);
523+
out.writeCollection(indicesRouting);
457524
} else if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
458525
out.writeInt(codecVersion);
459526
out.writeString(globalMetadataFileName);
@@ -480,7 +547,9 @@ public boolean equals(Object o) {
480547
&& Objects.equals(previousClusterUUID, that.previousClusterUUID)
481548
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
482549
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName)
483-
&& Objects.equals(codecVersion, that.codecVersion);
550+
&& Objects.equals(codecVersion, that.codecVersion)
551+
&& Objects.equals(routingTableVersion, that.routingTableVersion)
552+
&& Objects.equals(indicesRouting, that.indicesRouting);
484553
}
485554

486555
@Override
@@ -497,7 +566,9 @@ public int hashCode() {
497566
nodeId,
498567
committed,
499568
previousClusterUUID,
500-
clusterUUIDCommitted
569+
clusterUUIDCommitted,
570+
routingTableVersion,
571+
indicesRouting
501572
);
502573
}
503574

@@ -518,6 +589,10 @@ public static ClusterMetadataManifest fromXContentV1(XContentParser parser) thro
518589
return PARSER_V1.parse(parser, null);
519590
}
520591

592+
public static ClusterMetadataManifest fromXContentV2(XContentParser parser) throws IOException {
593+
return PARSER_V2.parse(parser, null);
594+
}
595+
521596
public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException {
522597
return CURRENT_PARSER.parse(parser, null);
523598
}
@@ -545,12 +620,24 @@ public static class Builder {
545620
private String previousClusterUUID;
546621
private boolean committed;
547622
private boolean clusterUUIDCommitted;
623+
private long routingTableVersion;
624+
private List<UploadedIndexMetadata> indicesRouting;
548625

549626
public Builder indices(List<UploadedIndexMetadata> indices) {
550627
this.indices = indices;
551628
return this;
552629
}
553630

631+
public Builder routingTableVersion(long routingTableVersion) {
632+
this.routingTableVersion = routingTableVersion;
633+
return this;
634+
}
635+
636+
public Builder indicesRouting(List<UploadedIndexMetadata> indicesRouting) {
637+
this.indicesRouting = indicesRouting;
638+
return this;
639+
}
640+
554641
public Builder codecVersion(int codecVersion) {
555642
this.codecVersion = codecVersion;
556643
return this;
@@ -625,6 +712,10 @@ public List<UploadedIndexMetadata> getIndices() {
625712
return indices;
626713
}
627714

715+
public List<UploadedIndexMetadata> getIndicesRouting() {
716+
return indicesRouting;
717+
}
718+
628719
public Builder previousClusterUUID(String previousClusterUUID) {
629720
this.previousClusterUUID = previousClusterUUID;
630721
return this;
@@ -638,6 +729,7 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
638729
public Builder() {
639730
indices = new ArrayList<>();
640731
customMetadataMap = new HashMap<>();
732+
indicesRouting = new ArrayList<>();
641733
}
642734

643735
public Builder(ClusterMetadataManifest manifest) {
@@ -657,6 +749,8 @@ public Builder(ClusterMetadataManifest manifest) {
657749
this.indices = new ArrayList<>(manifest.indices);
658750
this.previousClusterUUID = manifest.previousClusterUUID;
659751
this.clusterUUIDCommitted = manifest.clusterUUIDCommitted;
752+
this.routingTableVersion = manifest.routingTableVersion;
753+
this.indicesRouting = new ArrayList<>(manifest.indicesRouting);
660754
}
661755

662756
public ClusterMetadataManifest build() {
@@ -676,7 +770,9 @@ public ClusterMetadataManifest build() {
676770
coordinationMetadata,
677771
settingsMetadata,
678772
templatesMetadata,
679-
customMetadataMap
773+
customMetadataMap,
774+
routingTableVersion,
775+
indicesRouting
680776
);
681777
}
682778

@@ -776,11 +872,9 @@ public String getIndexUUID() {
776872

777873
@Override
778874
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
779-
return builder.startObject()
780-
.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
875+
return builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
781876
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
782-
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath())
783-
.endObject();
877+
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath());
784878
}
785879

786880
@Override

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public class RemoteClusterStateService implements Closeable {
216216
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
217217
+ "updated : [{}], custom metadata updated : [{}]";
218218
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
219-
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2;
219+
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
220220
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2;
221221

222222
// ToXContent Params with gateway mode.
@@ -806,7 +806,10 @@ private ClusterMetadataManifest uploadManifest(
806806
uploadedCoordinationMetadata,
807807
uploadedSettingsMetadata,
808808
uploadedTemplatesMetadata,
809-
uploadedCustomMetadataMap
809+
uploadedCustomMetadataMap,
810+
clusterState.routingTable().version(),
811+
// TODO: Add actual list of changed indices routing with index routing upload flow.
812+
new ArrayList<>()
810813
);
811814
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
812815
return manifest;

0 commit comments

Comments
 (0)