Skip to content

Commit a6790cb

Browse files
opensearch-trigger-bot[bot]github-actions[bot]
authored andcommitted
Fix flakiness of testRemoteCleanupDeleteStale, bug fix in RemoteMetadataManifest and RemoteReadResult (opensearch-project#14230) (opensearch-project#14353)
* Address flakiness of testRemoteCleanupDeleteStale - Make RemoteReadResult have Object rather than ToXContent - Fix getManifestCodecVersion in RemoteClusterMetadataManifest (cherry picked from commit 0c2ff03) Signed-off-by: Shivansh Arora <hishiv@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent b748ea1 commit a6790cb

11 files changed

+186
-20
lines changed

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ public void testRemoteCleanupDeleteStale() throws Exception {
108108
.add("cluster-state")
109109
.add(getClusterState().metadata().clusterUUID());
110110
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
111+
RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
112+
RemoteClusterStateCleanupManager.class
113+
);
111114

112115
// set cleanup interval to 100 ms to make the test faster
113116
ClusterUpdateSettingsResponse response = client().admin()
@@ -117,6 +120,7 @@ public void testRemoteCleanupDeleteStale() throws Exception {
117120
.get();
118121

119122
assertTrue(response.isAcknowledged());
123+
assertBusy(() -> assertEquals(100, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis()));
120124

121125
assertBusy(() -> {
122126
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
@@ -128,7 +132,7 @@ public void testRemoteCleanupDeleteStale() throws Exception {
128132
"Current number of manifest files: " + manifestFiles,
129133
manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
130134
);
131-
}, 500, TimeUnit.MILLISECONDS);
135+
});
132136

133137
// disable the clean up to avoid race condition during shutdown
134138
response = client().admin()

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.opensearch.common.remote.RemoteWritableEntityStore;
1616
import org.opensearch.core.action.ActionListener;
1717
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
18-
import org.opensearch.core.xcontent.ToXContent;
1918
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
2019
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
2120
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
@@ -121,7 +120,7 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
121120
LatchedActionListener<RemoteReadResult> listener
122121
) {
123122
final ActionListener actionListener = ActionListener.wrap(
124-
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)),
123+
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
125124
listener::onFailure
126125
);
127126
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
276276
ClusterState clusterState,
277277
ClusterMetadataManifest previousManifest
278278
) throws IOException {
279-
logger.info("WRITING INCREMENTAL STATE");
279+
logger.trace("WRITING INCREMENTAL STATE");
280280

281281
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
282282
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
@@ -766,7 +766,7 @@ private UploadedMetadataResults writeMetadataInParallel(
766766
throw new IllegalStateException("Unknown metadata component name " + name);
767767
}
768768
});
769-
logger.info("response {}", response.uploadedIndicesRoutingMetadata.toString());
769+
logger.trace("response {}", response.uploadedIndicesRoutingMetadata.toString());
770770
return response;
771771
}
772772

server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
2626
import org.opensearch.core.compress.Compressor;
2727
import org.opensearch.core.xcontent.NamedXContentRegistry;
28-
import org.opensearch.core.xcontent.ToXContent;
2928
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
3029
import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata;
3130
import org.opensearch.gateway.remote.model.RemoteCustomMetadata;
@@ -194,7 +193,7 @@ CheckedRunnable<IOException> getAsyncMetadataReadAction(
194193
LatchedActionListener<RemoteReadResult> listener
195194
) {
196195
ActionListener actionListener = ActionListener.wrap(
197-
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, readEntity.getType(), componentName)),
196+
response -> listener.onResponse(new RemoteReadResult(response, readEntity.getType(), componentName)),
198197
listener::onFailure
199198
);
200199
return () -> getStore(readEntity).readAsync(readEntity, actionListener);

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,17 @@ public ClusterMetadataManifest deserialize(final InputStream inputStream) throws
131131
return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
132132
}
133133

134-
private int getManifestCodecVersion() {
134+
// package private for testing
135+
int getManifestCodecVersion() {
135136
assert blobName != null;
136-
String[] splitName = blobName.split(DELIMITER);
137+
String[] splitName = getBlobFileName().split(DELIMITER);
137138
if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) {
138139
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version.
139140
} else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0
140141
// is used.
141142
return ClusterMetadataManifest.CODEC_V0;
142143
} else {
143-
throw new IllegalArgumentException("Manifest file name is corrupted");
144+
throw new IllegalArgumentException("Manifest file name is corrupted : " + blobName);
144145
}
145146
}
146147

server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,22 @@
88

99
package org.opensearch.gateway.remote.model;
1010

11-
import org.opensearch.core.xcontent.ToXContent;
12-
1311
/**
1412
* Container class for entity read from remote store
1513
*/
1614
public class RemoteReadResult {
1715

18-
ToXContent obj;
16+
Object obj;
1917
String component;
2018
String componentName;
2119

22-
public RemoteReadResult(ToXContent obj, String component, String componentName) {
20+
public RemoteReadResult(Object obj, String component, String componentName) {
2321
this.obj = obj;
2422
this.component = component;
2523
this.componentName = componentName;
2624
}
2725

28-
public ToXContent getObj() {
26+
public Object getObj() {
2927
return obj;
3028
}
3129

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.action.LatchedActionListener;
12+
import org.opensearch.cluster.block.ClusterBlocks;
13+
import org.opensearch.cluster.node.DiscoveryNodes;
14+
import org.opensearch.common.CheckedRunnable;
15+
import org.opensearch.common.settings.ClusterSettings;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.core.action.ActionListener;
18+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
19+
import org.opensearch.core.compress.Compressor;
20+
import org.opensearch.core.compress.NoneCompressor;
21+
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
22+
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
23+
import org.opensearch.gateway.remote.model.RemoteReadResult;
24+
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
25+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
26+
import org.opensearch.test.OpenSearchTestCase;
27+
import org.opensearch.threadpool.TestThreadPool;
28+
import org.opensearch.threadpool.ThreadPool;
29+
import org.junit.After;
30+
import org.junit.Assert;
31+
import org.junit.Before;
32+
33+
import java.io.IOException;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.atomic.AtomicReference;
36+
37+
import static java.util.Collections.emptyList;
38+
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
39+
import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS;
40+
import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT;
41+
import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks;
42+
import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES_FORMAT;
43+
import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodesTests.getDiscoveryNodes;
44+
import static org.mockito.ArgumentMatchers.anyIterable;
45+
import static org.mockito.ArgumentMatchers.anyString;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.when;
48+
49+
public class RemoteClusterStateAttributesManagerTests extends OpenSearchTestCase {
50+
private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager;
51+
private BlobStoreTransferService blobStoreTransferService;
52+
private BlobStoreRepository blobStoreRepository;
53+
private Compressor compressor;
54+
private ThreadPool threadpool = new TestThreadPool(RemoteClusterStateAttributesManagerTests.class.getName());
55+
56+
@Before
57+
public void setup() throws Exception {
58+
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
59+
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(emptyList());
60+
blobStoreRepository = mock(BlobStoreRepository.class);
61+
blobStoreTransferService = mock(BlobStoreTransferService.class);
62+
compressor = new NoneCompressor();
63+
64+
remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager(
65+
"test-cluster",
66+
blobStoreRepository,
67+
blobStoreTransferService,
68+
namedWriteableRegistry,
69+
threadpool
70+
);
71+
}
72+
73+
@After
74+
public void tearDown() throws Exception {
75+
super.tearDown();
76+
threadpool.shutdown();
77+
}
78+
79+
public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException {
80+
DiscoveryNodes discoveryNodes = getDiscoveryNodes();
81+
String fileName = randomAlphaOfLength(10);
82+
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
83+
DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, fileName, compressor).streamInput()
84+
);
85+
RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor);
86+
CountDownLatch latch = new CountDownLatch(1);
87+
AtomicReference<DiscoveryNodes> readDiscoveryNodes = new AtomicReference<>();
88+
LatchedActionListener<RemoteReadResult> assertingListener = new LatchedActionListener<>(
89+
ActionListener.wrap(response -> readDiscoveryNodes.set((DiscoveryNodes) response.getObj()), Assert::assertNull),
90+
latch
91+
);
92+
CheckedRunnable<IOException> runnable = remoteClusterStateAttributesManager.getAsyncMetadataReadAction(
93+
DISCOVERY_NODES,
94+
remoteObjForDownload,
95+
assertingListener
96+
);
97+
98+
try {
99+
runnable.run();
100+
latch.await();
101+
assertEquals(discoveryNodes.getSize(), readDiscoveryNodes.get().getSize());
102+
discoveryNodes.getNodes().forEach((nodeId, node) -> assertEquals(readDiscoveryNodes.get().get(nodeId), node));
103+
assertEquals(discoveryNodes.getClusterManagerNodeId(), readDiscoveryNodes.get().getClusterManagerNodeId());
104+
} catch (Exception e) {
105+
throw new RuntimeException(e);
106+
}
107+
}
108+
109+
public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException {
110+
ClusterBlocks clusterBlocks = randomClusterBlocks();
111+
String fileName = randomAlphaOfLength(10);
112+
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
113+
CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, fileName, compressor).streamInput()
114+
);
115+
RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(fileName, "cluster-uuid", compressor);
116+
CountDownLatch latch = new CountDownLatch(1);
117+
AtomicReference<ClusterBlocks> readClusterBlocks = new AtomicReference<>();
118+
LatchedActionListener<RemoteReadResult> assertingListener = new LatchedActionListener<>(
119+
ActionListener.wrap(response -> readClusterBlocks.set((ClusterBlocks) response.getObj()), Assert::assertNull),
120+
latch
121+
);
122+
123+
CheckedRunnable<IOException> runnable = remoteClusterStateAttributesManager.getAsyncMetadataReadAction(
124+
CLUSTER_BLOCKS,
125+
remoteClusterBlocks,
126+
assertingListener
127+
);
128+
129+
try {
130+
runnable.run();
131+
latch.await();
132+
assertEquals(clusterBlocks.global(), readClusterBlocks.get().global());
133+
assertEquals(clusterBlocks.indices().keySet(), readClusterBlocks.get().indices().keySet());
134+
for (String index : clusterBlocks.indices().keySet()) {
135+
assertEquals(clusterBlocks.indices().get(index), readClusterBlocks.get().indices().get(index));
136+
}
137+
} catch (Exception e) {
138+
throw new RuntimeException(e);
139+
}
140+
}
141+
}

server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void testSerDe() throws IOException {
136136
}
137137
}
138138

139-
static ClusterBlocks randomClusterBlocks() {
139+
public static ClusterBlocks randomClusterBlocks() {
140140
ClusterBlocks.Builder builder = ClusterBlocks.builder();
141141
int randomGlobalBlocks = randomIntBetween(1, 10);
142142
for (int i = 0; i < randomGlobalBlocks; i++) {

server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.util.stream.Stream;
4242

4343
import static java.util.stream.Collectors.toList;
44+
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0;
45+
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
4446
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;
4547
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
4648
import static org.hamcrest.Matchers.greaterThan;
@@ -236,6 +238,28 @@ public void testSerDe() throws IOException {
236238
assertThrows(IllegalArgumentException.class, () -> invalidRemoteObject.deserialize(new ByteArrayInputStream(new byte[0])));
237239
}
238240

241+
public void testGetManifestCodecVersion() {
242+
String manifestFileWithDelimiterInPath =
243+
"123456789012_test-cluster/cluster-state/dsgYj10__Nkso7/manifest/manifest__9223372036854775806__9223372036854775804__C__9223370319103329556__2";
244+
RemoteClusterMetadataManifest remoteManifestForDownload = new RemoteClusterMetadataManifest(
245+
manifestFileWithDelimiterInPath,
246+
clusterUUID,
247+
compressor,
248+
namedXContentRegistry
249+
);
250+
assertEquals(CODEC_V2, remoteManifestForDownload.getManifestCodecVersion());
251+
252+
String v0ManifestFileWithDelimiterInPath =
253+
"123456789012_test-cluster/cluster-state/dsgYj10__Nkso7/manifest/manifest__9223372036854775806__9223372036854775804__C__9223370319103329556";
254+
RemoteClusterMetadataManifest remoteManifestV0ForDownload = new RemoteClusterMetadataManifest(
255+
v0ManifestFileWithDelimiterInPath,
256+
clusterUUID,
257+
compressor,
258+
namedXContentRegistry
259+
);
260+
assertEquals(CODEC_V0, remoteManifestV0ForDownload.getManifestCodecVersion());
261+
}
262+
239263
private ClusterMetadataManifest getClusterMetadataManifest() {
240264
return ClusterMetadataManifest.builder()
241265
.opensearchVersion(Version.CURRENT)

server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustomsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,12 @@ public void testSerDe() throws IOException {
232232
try (InputStream inputStream = remoteObjectForUpload.serialize()) {
233233
remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath());
234234
assertThat(inputStream.available(), greaterThan(0));
235-
Custom readclusterStateCustoms = remoteObjectForUpload.deserialize(inputStream);
236-
assertThat(readclusterStateCustoms, is(clusterStateCustoms));
235+
Custom readClusterStateCustoms = remoteObjectForUpload.deserialize(inputStream);
236+
assertThat(readClusterStateCustoms, is(clusterStateCustoms));
237237
}
238238
}
239239

240-
private Custom getClusterStateCustom() {
240+
public static SnapshotsInProgress getClusterStateCustom() {
241241
return SnapshotsInProgress.of(
242242
List.of(
243243
new SnapshotsInProgress.Entry(

server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void testExceptionDuringDeserialize() throws IOException {
156156
IOException ioe = assertThrows(IOException.class, () -> remoteObjectForDownload.deserialize(in));
157157
}
158158

159-
private DiscoveryNodes getDiscoveryNodes() {
159+
public static DiscoveryNodes getDiscoveryNodes() {
160160
return DiscoveryNodes.builder()
161161
.add(
162162
new DiscoveryNode(

0 commit comments

Comments
 (0)