Skip to content

Commit 4516065

Browse files
authored
Limiting node attribute serialisation in DiscoveryNode (#15341)
Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com>
1 parent 7a9cb35 commit 4516065

File tree

20 files changed

+93
-25
lines changed

20 files changed

+93
-25
lines changed

libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -969,9 +969,13 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
969969
}
970970

971971
public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
972+
writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable);
973+
}
974+
975+
public <T extends Writeable> void writeOptionalWriteable(final Writer<T> writer, @Nullable T writeable) throws IOException {
972976
if (writeable != null) {
973977
writeBoolean(true);
974-
writeable.writeTo(this);
978+
writer.write(this, writeable);
975979
} else {
976980
writeBoolean(false);
977981
}

server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException {
9595
@Override
9696
public void writeTo(StreamOutput out) throws IOException {
9797
shardRouting.writeTo(out);
98-
out.writeOptionalWriteable(currentNode);
98+
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode);
9999
out.writeOptionalWriteable(relocationTargetNode);
100100
out.writeOptionalWriteable(clusterInfo);
101101
shardAllocationDecision.writeTo(out);

server/src/main/java/org/opensearch/action/support/nodes/BaseNodeResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,6 @@ public DiscoveryNode getNode() {
6767

6868
@Override
6969
public void writeTo(StreamOutput out) throws IOException {
70-
node.writeTo(out);
70+
node.writeToWithAttribute(out);
7171
}
7272
}

server/src/main/java/org/opensearch/cluster/ClusterState.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException {
781781
out.writeString(stateUUID);
782782
metadata.writeTo(out);
783783
routingTable.writeTo(out);
784-
nodes.writeTo(out);
784+
nodes.writeToWithAttribute(out);
785785
blocks.writeTo(out);
786786
// filter out custom states not supported by the other node
787787
int numberOfCustoms = 0;
@@ -887,13 +887,23 @@ public void writeTo(StreamOutput out) throws IOException {
887887
out.writeString(toUuid);
888888
out.writeLong(toVersion);
889889
routingTable.writeTo(out);
890-
nodes.writeTo(out);
890+
nodesWriteToWithAttributes(nodes, out);
891891
metadata.writeTo(out);
892892
blocks.writeTo(out);
893893
customs.writeTo(out);
894894
out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager);
895895
}
896896

897+
private void nodesWriteToWithAttributes(Diff<DiscoveryNodes> nodes, StreamOutput out) throws IOException {
898+
DiscoveryNodes part = nodes.apply(null);
899+
if (part != null) {
900+
out.writeBoolean(true);
901+
part.writeToWithAttribute(out);
902+
} else {
903+
out.writeBoolean(false);
904+
}
905+
}
906+
897907
@Override
898908
public ClusterState apply(ClusterState state) {
899909
Builder builder = new Builder(clusterName);

server/src/main/java/org/opensearch/cluster/coordination/Join.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException {
7878

7979
@Override
8080
public void writeTo(StreamOutput out) throws IOException {
81-
sourceNode.writeTo(out);
82-
targetNode.writeTo(out);
81+
sourceNode.writeToWithAttribute(out);
82+
targetNode.writeToWithAttribute(out);
8383
out.writeLong(term);
8484
out.writeLong(lastAcceptedTerm);
8585
out.writeLong(lastAcceptedVersion);

server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public JoinRequest(StreamInput in) throws IOException {
8484
@Override
8585
public void writeTo(StreamOutput out) throws IOException {
8686
super.writeTo(out);
87-
sourceNode.writeTo(out);
87+
sourceNode.writeToWithAttribute(out);
8888
out.writeLong(minimumTerm);
8989
out.writeOptionalWriteable(optionalJoin.orElse(null));
9090
}

server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException {
6464
@Override
6565
public void writeTo(StreamOutput out) throws IOException {
6666
super.writeTo(out);
67-
sourceNode.writeTo(out);
67+
sourceNode.writeToWithAttribute(out);
6868
out.writeLong(term);
6969
}
7070

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ public DiscoveryNode(StreamInput in) throws IOException {
330330
for (int i = 0; i < size; i++) {
331331
this.attributes.put(in.readString(), in.readString());
332332
}
333+
333334
int rolesSize = in.readVInt();
334335
final Set<DiscoveryNodeRole> roles = new HashSet<>(rolesSize);
335336
for (int i = 0; i < rolesSize; i++) {
@@ -359,13 +360,31 @@ public DiscoveryNode(StreamInput in) throws IOException {
359360

360361
@Override
361362
public void writeTo(StreamOutput out) throws IOException {
363+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
364+
writeToUtil(out, false);
365+
} else {
366+
writeToUtil(out, true);
367+
}
368+
369+
}
370+
371+
public void writeToWithAttribute(StreamOutput out) throws IOException {
372+
writeToUtil(out, true);
373+
}
374+
375+
public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException {
362376
writeNodeDetails(out);
363377

364-
out.writeVInt(attributes.size());
365-
for (Map.Entry<String, String> entry : attributes.entrySet()) {
366-
out.writeString(entry.getKey());
367-
out.writeString(entry.getValue());
378+
if (includeAllAttributes) {
379+
out.writeVInt(attributes.size());
380+
for (Map.Entry<String, String> entry : attributes.entrySet()) {
381+
out.writeString(entry.getKey());
382+
out.writeString(entry.getValue());
383+
}
384+
} else {
385+
out.writeVInt(0);
368386
}
387+
369388
writeRolesAndVersion(out);
370389
}
371390

server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,10 +690,18 @@ public String shortSummary() {
690690

691691
@Override
692692
public void writeTo(StreamOutput out) throws IOException {
693+
writeToUtil((output, value) -> value.writeTo(output), out);
694+
}
695+
696+
public void writeToWithAttribute(StreamOutput out) throws IOException {
697+
writeToUtil((output, value) -> value.writeToWithAttribute(output), out);
698+
}
699+
700+
public void writeToUtil(final Writer<DiscoveryNode> writer, StreamOutput out) throws IOException {
693701
writeClusterManager(out);
694702
out.writeVInt(nodes.size());
695703
for (DiscoveryNode node : this) {
696-
node.writeTo(out);
704+
writer.write(out, node);
697705
}
698706
}
699707

server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public List<NodeAllocationResult> getNodeDecisions() {
107107

108108
@Override
109109
public void writeTo(StreamOutput out) throws IOException {
110-
out.writeOptionalWriteable(targetNode);
110+
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), targetNode);
111111
if (nodeDecisions != null) {
112112
out.writeBoolean(true);
113113
out.writeList(nodeDecisions);

server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public NodeAllocationResult(StreamInput in) throws IOException {
104104

105105
@Override
106106
public void writeTo(StreamOutput out) throws IOException {
107-
node.writeTo(out);
107+
node.writeToWithAttribute(out);
108108
out.writeOptionalWriteable(shardStoreInfo);
109109
out.writeOptionalWriteable(canAllocateDecision);
110110
nodeDecision.writeTo(out);

server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,12 @@ public UploadedMetadata getUploadedMetadata() {
8888

8989
@Override
9090
public InputStream serialize() throws IOException {
91-
return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput();
91+
return DISCOVERY_NODES_FORMAT.serialize(
92+
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
93+
discoveryNodes,
94+
generateBlobFileName(),
95+
getCompressor()
96+
).streamInput();
9297
}
9398

9499
@Override

server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
294294
logger.debug("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
295295
}
296296
}
297+
297298
transportService.sendRequest(
298299
startRequest.sourceNode(),
299300
actionName,

server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ public void writeTo(StreamOutput out) throws IOException {
144144
out.writeLong(recoveryId);
145145
shardId.writeTo(out);
146146
out.writeString(targetAllocationId);
147-
sourceNode.writeTo(out);
148-
targetNode.writeTo(out);
147+
sourceNode.writeToWithAttribute(out);
148+
targetNode.writeToWithAttribute(out);
149149
metadataSnapshot.writeTo(out);
150150
out.writeBoolean(primaryRelocation);
151151
out.writeLong(startingSeqNo);

server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.core.common.io.stream.StreamInput;
2929
import org.opensearch.core.common.io.stream.StreamOutput;
3030
import org.opensearch.core.common.io.stream.Writeable;
31+
import org.opensearch.core.common.io.stream.Writeable.Writer;
3132
import org.opensearch.core.compress.Compressor;
3233
import org.opensearch.core.compress.CompressorRegistry;
3334
import org.opensearch.gateway.CorruptStateException;
@@ -56,6 +57,10 @@ public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction<StreamInput
5657
}
5758

5859
public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
60+
return serialize((out, unSerializedObj) -> unSerializedObj.writeTo(out), obj, blobName, compressor);
61+
}
62+
63+
public BytesReference serialize(final Writer<T> writer, T obj, final String blobName, final Compressor compressor) throws IOException {
5964
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
6065
try (
6166
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
@@ -76,7 +81,7 @@ public void close() throws IOException {
7681
}; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) {
7782
// TODO The stream version should be configurable
7883
stream.setVersion(Version.CURRENT);
79-
obj.writeTo(stream);
84+
writer.write(stream, obj);
8085
}
8186
CodecUtil.writeFooter(indexOutput);
8287
}

server/src/main/java/org/opensearch/transport/TransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ public HandshakeResponse(StreamInput in) throws IOException {
752752

753753
@Override
754754
public void writeTo(StreamOutput out) throws IOException {
755-
out.writeOptionalWriteable(discoveryNode);
755+
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), discoveryNode);
756756
clusterName.writeTo(out);
757757
out.writeVersion(version);
758758
}

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr
140140
DiscoveryNodes discoveryNodes = getDiscoveryNodes();
141141
String fileName = randomAlphaOfLength(10);
142142
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
143-
DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, fileName, compressor).streamInput()
143+
DISCOVERY_NODES_FORMAT.serialize(
144+
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
145+
discoveryNodes,
146+
fileName,
147+
compressor
148+
).streamInput()
144149
);
145150
RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor);
146151
CountDownLatch latch = new CountDownLatch(1);

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1220,7 +1220,12 @@ public void testGetClusterStateUsingDiff() throws IOException {
12201220
diffManifestBuilder.discoveryNodesUpdated(true);
12211221
manifestBuilder.discoveryNodesMetadata(new UploadedMetadataAttribute(DISCOVERY_NODES, DISCOVERY_NODES_FILENAME));
12221222
when(blobContainer.readBlob(DISCOVERY_NODES_FILENAME)).thenAnswer(invocationOnMock -> {
1223-
BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(nodesBuilder.build(), DISCOVERY_NODES_FILENAME, compressor);
1223+
BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(
1224+
(out, nodes) -> nodes.writeToWithAttribute(out),
1225+
nodesBuilder.build(),
1226+
DISCOVERY_NODES_FILENAME,
1227+
compressor
1228+
);
12241229
return new ByteArrayInputStream(bytes.streamInput().readAllBytes());
12251230
});
12261231
}

server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void testSerDe() throws IOException {
143143
public void testExceptionDuringSerialization() throws IOException {
144144
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
145145
RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor);
146-
doThrow(new IOException("mock-exception")).when(nodes).writeTo(any());
146+
doThrow(new IOException("mock-exception")).when(nodes).writeToWithAttribute(any());
147147
IOException iea = assertThrows(IOException.class, remoteObjectForUpload::serialize);
148148
}
149149

server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,20 @@ public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase {
3535

3636
public void testSerDe() throws IOException {
3737
IndexMetadata indexMetadata = getIndexMetadata();
38-
BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none());
38+
BytesReference bytesReference = clusterBlocksFormat.serialize(
39+
(out, metadata) -> metadata.writeTo(out),
40+
indexMetadata,
41+
TEST_BLOB_FILE_NAME,
42+
CompressorRegistry.none()
43+
);
3944
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference);
4045
assertThat(readIndexMetadata, is(indexMetadata));
4146
}
4247

4348
public void testSerDeForCompressed() throws IOException {
4449
IndexMetadata indexMetadata = getIndexMetadata();
4550
BytesReference bytesReference = clusterBlocksFormat.serialize(
51+
(out, metadata) -> metadata.writeTo(out),
4652
indexMetadata,
4753
TEST_BLOB_FILE_NAME,
4854
CompressorRegistry.getCompressor(DeflateCompressor.NAME)

0 commit comments

Comments
 (0)