Skip to content

Commit 67eceaa

Browse files
authored
Serializing node attribute in discoveryNode only in scenarioes where it is required (#15617)
Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com>
1 parent bd57e5d commit 67eceaa

File tree

20 files changed

+91
-25
lines changed

20 files changed

+91
-25
lines changed

libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -969,9 +969,13 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
969969
}
970970

971971
public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
972+
writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable);
973+
}
974+
975+
public <T extends Writeable> void writeOptionalWriteable(final Writer<T> writer, @Nullable T writeable) throws IOException {
972976
if (writeable != null) {
973977
writeBoolean(true);
974-
writeable.writeTo(this);
978+
writer.write(this, writeable);
975979
} else {
976980
writeBoolean(false);
977981
}

server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException {
9595
@Override
9696
public void writeTo(StreamOutput out) throws IOException {
9797
shardRouting.writeTo(out);
98-
out.writeOptionalWriteable(currentNode);
98+
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode);
9999
out.writeOptionalWriteable(relocationTargetNode);
100100
out.writeOptionalWriteable(clusterInfo);
101101
shardAllocationDecision.writeTo(out);

server/src/main/java/org/opensearch/action/support/nodes/BaseNodeResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,6 @@ public DiscoveryNode getNode() {
6767

6868
@Override
6969
public void writeTo(StreamOutput out) throws IOException {
70-
node.writeTo(out);
70+
node.writeToWithAttribute(out);
7171
}
7272
}

server/src/main/java/org/opensearch/cluster/ClusterState.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException {
781781
out.writeString(stateUUID);
782782
metadata.writeTo(out);
783783
routingTable.writeTo(out);
784-
nodes.writeTo(out);
784+
nodes.writeToWithAttribute(out);
785785
blocks.writeTo(out);
786786
// filter out custom states not supported by the other node
787787
int numberOfCustoms = 0;
@@ -859,13 +859,23 @@ public void writeTo(StreamOutput out) throws IOException {
859859
out.writeString(toUuid);
860860
out.writeLong(toVersion);
861861
routingTable.writeTo(out);
862-
nodes.writeTo(out);
862+
nodesWriteToWithAttributes(nodes, out);
863863
metadata.writeTo(out);
864864
blocks.writeTo(out);
865865
customs.writeTo(out);
866866
out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager);
867867
}
868868

869+
private void nodesWriteToWithAttributes(Diff<DiscoveryNodes> nodes, StreamOutput out) throws IOException {
870+
DiscoveryNodes part = nodes.apply(null);
871+
if (part != null) {
872+
out.writeBoolean(true);
873+
part.writeToWithAttribute(out);
874+
} else {
875+
out.writeBoolean(false);
876+
}
877+
}
878+
869879
@Override
870880
public ClusterState apply(ClusterState state) {
871881
Builder builder = new Builder(clusterName);

server/src/main/java/org/opensearch/cluster/coordination/Join.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException {
7878

7979
@Override
8080
public void writeTo(StreamOutput out) throws IOException {
81-
sourceNode.writeTo(out);
82-
targetNode.writeTo(out);
81+
sourceNode.writeToWithAttribute(out);
82+
targetNode.writeToWithAttribute(out);
8383
out.writeLong(term);
8484
out.writeLong(lastAcceptedTerm);
8585
out.writeLong(lastAcceptedVersion);

server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public JoinRequest(StreamInput in) throws IOException {
8989
@Override
9090
public void writeTo(StreamOutput out) throws IOException {
9191
super.writeTo(out);
92-
sourceNode.writeTo(out);
92+
sourceNode.writeToWithAttribute(out);
9393
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_7_0)) {
9494
out.writeLong(minimumTerm);
9595
}

server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException {
6464
@Override
6565
public void writeTo(StreamOutput out) throws IOException {
6666
super.writeTo(out);
67-
sourceNode.writeTo(out);
67+
sourceNode.writeToWithAttribute(out);
6868
out.writeLong(term);
6969
}
7070

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -385,17 +385,34 @@ public DiscoveryNode(StreamInput in) throws IOException {
385385

386386
@Override
387387
public void writeTo(StreamOutput out) throws IOException {
388+
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
389+
writeToUtil(out, false);
390+
} else {
391+
writeToUtil(out, true);
392+
}
393+
}
394+
395+
public void writeToWithAttribute(StreamOutput out) throws IOException {
396+
writeToUtil(out, true);
397+
}
398+
399+
public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException {
388400
out.writeString(nodeName);
389401
out.writeString(nodeId);
390402
out.writeString(ephemeralId);
391403
out.writeString(hostName);
392404
out.writeString(hostAddress);
393405
address.writeTo(out);
394-
out.writeVInt(attributes.size());
395-
for (Map.Entry<String, String> entry : attributes.entrySet()) {
396-
out.writeString(entry.getKey());
397-
out.writeString(entry.getValue());
406+
if (includeAllAttributes) {
407+
out.writeVInt(attributes.size());
408+
for (Map.Entry<String, String> entry : attributes.entrySet()) {
409+
out.writeString(entry.getKey());
410+
out.writeString(entry.getValue());
411+
}
412+
} else {
413+
out.writeVInt(0);
398414
}
415+
399416
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_3_0)) {
400417
out.writeVInt(roles.size());
401418
for (final DiscoveryNodeRole role : roles) {

server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,14 @@ public String shortSummary() {
701701

702702
@Override
703703
public void writeTo(StreamOutput out) throws IOException {
704+
writeToUtil((output, value) -> value.writeTo(output), out);
705+
}
706+
707+
public void writeToWithAttribute(StreamOutput out) throws IOException {
708+
writeToUtil((output, value) -> value.writeToWithAttribute(output), out);
709+
}
710+
711+
private void writeToUtil(final Writer<DiscoveryNode> writer, StreamOutput out) throws IOException {
704712
if (clusterManagerNodeId == null) {
705713
out.writeBoolean(false);
706714
} else {
@@ -709,7 +717,7 @@ public void writeTo(StreamOutput out) throws IOException {
709717
}
710718
out.writeVInt(nodes.size());
711719
for (DiscoveryNode node : this) {
712-
node.writeTo(out);
720+
writer.write(out, node);
713721
}
714722
}
715723

server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public List<NodeAllocationResult> getNodeDecisions() {
107107

108108
@Override
109109
public void writeTo(StreamOutput out) throws IOException {
110-
out.writeOptionalWriteable(targetNode);
110+
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), targetNode);
111111
if (nodeDecisions != null) {
112112
out.writeBoolean(true);
113113
out.writeList(nodeDecisions);

server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public NodeAllocationResult(StreamInput in) throws IOException {
104104

105105
@Override
106106
public void writeTo(StreamOutput out) throws IOException {
107-
node.writeTo(out);
107+
node.writeToWithAttribute(out);
108108
out.writeOptionalWriteable(shardStoreInfo);
109109
out.writeOptionalWriteable(canAllocateDecision);
110110
nodeDecision.writeTo(out);

server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,12 @@ public UploadedMetadata getUploadedMetadata() {
8888

8989
@Override
9090
public InputStream serialize() throws IOException {
91-
return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput();
91+
return DISCOVERY_NODES_FORMAT.serialize(
92+
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
93+
discoveryNodes,
94+
generateBlobFileName(),
95+
getCompressor()
96+
).streamInput();
9297
}
9398

9499
@Override

server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
295295
logger.debug("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
296296
}
297297
}
298+
298299
transportService.sendRequest(
299300
startRequest.sourceNode(),
300301
actionName,

server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ public void writeTo(StreamOutput out) throws IOException {
144144
out.writeLong(recoveryId);
145145
shardId.writeTo(out);
146146
out.writeString(targetAllocationId);
147-
sourceNode.writeTo(out);
148-
targetNode.writeTo(out);
147+
sourceNode.writeToWithAttribute(out);
148+
targetNode.writeToWithAttribute(out);
149149
metadataSnapshot.writeTo(out);
150150
out.writeBoolean(primaryRelocation);
151151
out.writeLong(startingSeqNo);

server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.core.common.io.stream.StreamInput;
2929
import org.opensearch.core.common.io.stream.StreamOutput;
3030
import org.opensearch.core.common.io.stream.Writeable;
31+
import org.opensearch.core.common.io.stream.Writeable.Writer;
3132
import org.opensearch.core.compress.Compressor;
3233
import org.opensearch.core.compress.CompressorRegistry;
3334
import org.opensearch.gateway.CorruptStateException;
@@ -56,6 +57,10 @@ public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction<StreamInput
5657
}
5758

5859
public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
60+
return serialize((out, unSerializedObj) -> unSerializedObj.writeTo(out), obj, blobName, compressor);
61+
}
62+
63+
public BytesReference serialize(final Writer<T> writer, T obj, final String blobName, final Compressor compressor) throws IOException {
5964
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
6065
try (
6166
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
@@ -76,7 +81,7 @@ public void close() throws IOException {
7681
}; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) {
7782
// TODO The stream version should be configurable
7883
stream.setVersion(Version.CURRENT);
79-
obj.writeTo(stream);
84+
writer.write(stream, obj);
8085
}
8186
CodecUtil.writeFooter(indexOutput);
8287
}

server/src/main/java/org/opensearch/transport/TransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ public HandshakeResponse(StreamInput in) throws IOException {
759759

760760
@Override
761761
public void writeTo(StreamOutput out) throws IOException {
762-
out.writeOptionalWriteable(discoveryNode);
762+
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), discoveryNode);
763763
clusterName.writeTo(out);
764764
if (out.getVersion().before(Version.V_1_0_0)) {
765765
out.writeVersion(LegacyESVersion.V_7_10_2);

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr
140140
DiscoveryNodes discoveryNodes = getDiscoveryNodes();
141141
String fileName = randomAlphaOfLength(10);
142142
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
143-
DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, fileName, compressor).streamInput()
143+
DISCOVERY_NODES_FORMAT.serialize(
144+
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
145+
discoveryNodes,
146+
fileName,
147+
compressor
148+
).streamInput()
144149
);
145150
RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor);
146151
CountDownLatch latch = new CountDownLatch(1);

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,12 @@ public void testGetClusterStateUsingDiff() throws IOException {
11861186
diffManifestBuilder.discoveryNodesUpdated(true);
11871187
manifestBuilder.discoveryNodesMetadata(new UploadedMetadataAttribute(DISCOVERY_NODES, DISCOVERY_NODES_FILENAME));
11881188
when(blobContainer.readBlob(DISCOVERY_NODES_FILENAME)).thenAnswer(invocationOnMock -> {
1189-
BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(nodesBuilder.build(), DISCOVERY_NODES_FILENAME, compressor);
1189+
BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(
1190+
(out, nodes) -> nodes.writeToWithAttribute(out),
1191+
nodesBuilder.build(),
1192+
DISCOVERY_NODES_FILENAME,
1193+
compressor
1194+
);
11901195
return new ByteArrayInputStream(bytes.streamInput().readAllBytes());
11911196
});
11921197
}

server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void testSerDe() throws IOException {
143143
public void testExceptionDuringSerialization() throws IOException {
144144
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
145145
RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor);
146-
doThrow(new IOException("mock-exception")).when(nodes).writeTo(any());
146+
doThrow(new IOException("mock-exception")).when(nodes).writeToWithAttribute(any());
147147
IOException iea = assertThrows(IOException.class, remoteObjectForUpload::serialize);
148148
}
149149

server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,20 @@ public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase {
3535

3636
public void testSerDe() throws IOException {
3737
IndexMetadata indexMetadata = getIndexMetadata();
38-
BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none());
38+
BytesReference bytesReference = clusterBlocksFormat.serialize(
39+
(out, metadata) -> metadata.writeTo(out),
40+
indexMetadata,
41+
TEST_BLOB_FILE_NAME,
42+
CompressorRegistry.none()
43+
);
3944
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference);
4045
assertThat(readIndexMetadata, is(indexMetadata));
4146
}
4247

4348
public void testSerDeForCompressed() throws IOException {
4449
IndexMetadata indexMetadata = getIndexMetadata();
4550
BytesReference bytesReference = clusterBlocksFormat.serialize(
51+
(out, metadata) -> metadata.writeTo(out),
4652
indexMetadata,
4753
TEST_BLOB_FILE_NAME,
4854
CompressorRegistry.getCompressor(DeflateCompressor.NAME)

0 commit comments

Comments
 (0)