Skip to content

Commit 11fd0dd

Browse files
authored
[Remote State] Create interface RemoteEntitiesManager (#14671) (#14854)
* Create interface RemoteEntitiesManager Signed-off-by: Shivansh Arora <hishiv@amazon.com> (cherry picked from commit b585469)
1 parent 4c7d94c commit 11fd0dd

15 files changed

+839
-724
lines changed

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.opensearch.cluster.DiffableUtils;
1818
import org.opensearch.cluster.routing.IndexRoutingTable;
1919
import org.opensearch.cluster.routing.RoutingTable;
20-
import org.opensearch.common.CheckedRunnable;
2120
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
2221
import org.opensearch.common.blobstore.BlobContainer;
2322
import org.opensearch.common.blobstore.BlobPath;
@@ -150,14 +149,14 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
150149
}
151150

152151
/**
153-
* Create async action for writing one {@code IndexRoutingTable} to remote store
152+
* Async action for writing one {@code IndexRoutingTable} to remote store
154153
* @param clusterState current cluster state
155154
* @param indexRouting indexRoutingTable to write to remote store
156155
* @param latchedActionListener listener for handling async action response
157156
* @param clusterBasePath base path for remote file
158-
* @return returns runnable async action
159157
*/
160-
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
158+
@Override
159+
public void getIndexRoutingAsyncAction(
161160
ClusterState clusterState,
162161
IndexRoutingTable indexRouting,
163162
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
@@ -187,7 +186,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
187186
)
188187
);
189188

190-
return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
189+
uploadIndex(indexRouting, fileName, blobContainer, completionListener);
191190
}
192191

193192
/**
@@ -274,7 +273,7 @@ private void uploadIndex(
274273
}
275274

276275
@Override
277-
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
276+
public void getAsyncIndexRoutingReadAction(
278277
String uploadedFilename,
279278
Index index,
280279
LatchedActionListener<IndexRoutingTable> latchedActionListener
@@ -284,7 +283,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
284283
BlobContainer blobContainer = blobStoreRepository.blobStore()
285284
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));
286285

287-
return () -> readAsync(
286+
readAsync(
288287
blobContainer,
289288
blobFileName,
290289
index,

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.opensearch.cluster.DiffableUtils;
1414
import org.opensearch.cluster.routing.IndexRoutingTable;
1515
import org.opensearch.cluster.routing.RoutingTable;
16-
import org.opensearch.common.CheckedRunnable;
1716
import org.opensearch.common.blobstore.BlobPath;
1817
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
1918
import org.opensearch.core.index.Index;
@@ -42,14 +41,13 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
4241
}
4342

4443
@Override
45-
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
44+
public void getIndexRoutingAsyncAction(
4645
ClusterState clusterState,
4746
IndexRoutingTable indexRouting,
4847
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
4948
BlobPath clusterBasePath
5049
) {
5150
// noop
52-
return () -> {};
5351
}
5452

5553
@Override
@@ -63,13 +61,12 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
6361
}
6462

6563
@Override
66-
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
64+
public void getAsyncIndexRoutingReadAction(
6765
String uploadedFilename,
6866
Index index,
6967
LatchedActionListener<IndexRoutingTable> latchedActionListener
7068
) {
7169
// noop
72-
return () -> {};
7370
}
7471

7572
@Override

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.opensearch.cluster.DiffableUtils;
1414
import org.opensearch.cluster.routing.IndexRoutingTable;
1515
import org.opensearch.cluster.routing.RoutingTable;
16-
import org.opensearch.common.CheckedRunnable;
1716
import org.opensearch.common.blobstore.BlobPath;
1817
import org.opensearch.common.lifecycle.LifecycleComponent;
1918
import org.opensearch.core.common.io.stream.StreamInput;
@@ -46,7 +45,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {
4645

4746
List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable);
4847

49-
CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
48+
void getAsyncIndexRoutingReadAction(
5049
String uploadedFilename,
5150
Index index,
5251
LatchedActionListener<IndexRoutingTable> latchedActionListener
@@ -62,7 +61,7 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>
6261
RoutingTable after
6362
);
6463

65-
CheckedRunnable<IOException> getIndexRoutingAsyncAction(
64+
void getIndexRoutingAsyncAction(
6665
ClusterState clusterState,
6766
IndexRoutingTable indexRouting,
6867
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
13+
import org.opensearch.gateway.remote.model.RemoteReadResult;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
/**
19+
* An abstract class that provides a base implementation for managing remote entities in the remote store.
20+
*/
21+
public abstract class AbstractRemoteWritableEntityManager implements RemoteWritableEntityManager {
22+
/**
23+
* A map that stores the remote writable entity stores, keyed by the entity type.
24+
*/
25+
protected final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores = new HashMap<>();
26+
27+
/**
28+
* Retrieves the remote writable entity store for the given entity.
29+
*
30+
* @param entity the entity for which the store is requested
31+
* @return the remote writable entity store for the given entity
32+
* @throws IllegalArgumentException if the entity type is unknown
33+
*/
34+
protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
35+
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
36+
if (remoteStore == null) {
37+
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
38+
}
39+
return remoteStore;
40+
}
41+
42+
/**
43+
* Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener.
44+
*
45+
* @param component the component for which the write operation is performed
46+
* @param remoteEntity the remote object to be written
47+
* @param listener the listener to be notified when the write operation completes
48+
* @return an ActionListener for handling the write operation
49+
*/
50+
protected abstract ActionListener<Void> getWrappedWriteListener(
51+
String component,
52+
AbstractRemoteWritableBlobEntity remoteEntity,
53+
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
54+
);
55+
56+
/**
57+
* Returns an ActionListener for handling the read operation for the specified component,
58+
* remote object, and latched action listener.
59+
*
60+
* @param component the component for which the read operation is performed
61+
* @param remoteEntity the remote object to be read
62+
* @param listener the listener to be notified when the read operation completes
63+
* @return an ActionListener for handling the read operation
64+
*/
65+
protected abstract ActionListener<Object> getWrappedReadListener(
66+
String component,
67+
AbstractRemoteWritableBlobEntity remoteEntity,
68+
ActionListener<RemoteReadResult> listener
69+
);
70+
71+
@Override
72+
public void writeAsync(
73+
String component,
74+
AbstractRemoteWritableBlobEntity entity,
75+
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
76+
) {
77+
getStore(entity).writeAsync(entity, getWrappedWriteListener(component, entity, listener));
78+
}
79+
80+
@Override
81+
public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
82+
getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener));
83+
}
84+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
13+
import org.opensearch.gateway.remote.model.RemoteReadResult;
14+
15+
/**
16+
* The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store
17+
*/
18+
public interface RemoteWritableEntityManager {
19+
20+
/**
21+
* Performs an asynchronous read operation for the specified component and entity.
22+
*
23+
* @param component the component for which the read operation is performed
24+
* @param entity the entity to be read
25+
* @param listener the listener to be notified when the read operation completes.
26+
* The listener's {@link ActionListener#onResponse(Object)} method
27+
* is called with a {@link RemoteReadResult} object containing the
28+
* read data on successful read. The
29+
* {@link ActionListener#onFailure(Exception)} method is called with
30+
* an exception if the read operation fails.
31+
*/
32+
void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener);
33+
34+
/**
35+
* Performs an asynchronous write operation for the specified component and entity.
36+
*
37+
* @param component the component for which the write operation is performed
38+
* @param entity the entity to be written
39+
* @param listener the listener to be notified when the write operation completes.
40+
* The listener's {@link ActionListener#onResponse(Object)} method
41+
* is called with a {@link UploadedMetadata} object containing the
42+
* uploaded metadata on successful write. The
43+
* {@link ActionListener#onFailure(Exception)} method is called with
44+
* an exception if the write operation fails.
45+
*/
46+
void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<UploadedMetadata> listener);
47+
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@
88

99
package org.opensearch.gateway.remote;
1010

11-
import org.opensearch.action.LatchedActionListener;
1211
import org.opensearch.cluster.ClusterState;
1312
import org.opensearch.cluster.DiffableUtils;
1413
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
15-
import org.opensearch.common.CheckedRunnable;
1614
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
17-
import org.opensearch.common.remote.RemoteWritableEntityStore;
15+
import org.opensearch.common.remote.AbstractRemoteWritableEntityManager;
1816
import org.opensearch.core.action.ActionListener;
1917
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
2018
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
@@ -26,23 +24,19 @@
2624
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2725
import org.opensearch.threadpool.ThreadPool;
2826

29-
import java.io.IOException;
3027
import java.util.Collections;
31-
import java.util.HashMap;
3228
import java.util.Map;
3329

3430
/**
3531
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
3632
*
3733
* @opensearch.internal
3834
*/
39-
public class RemoteClusterStateAttributesManager {
35+
public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableEntityManager {
4036
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
4137
public static final String DISCOVERY_NODES = "nodes";
4238
public static final String CLUSTER_BLOCKS = "blocks";
4339
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
44-
private final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores;
45-
private final NamedWriteableRegistry namedWriteableRegistry;
4640

4741
RemoteClusterStateAttributesManager(
4842
String clusterName,
@@ -51,8 +45,6 @@ public class RemoteClusterStateAttributesManager {
5145
NamedWriteableRegistry namedWriteableRegistry,
5246
ThreadPool threadpool
5347
) {
54-
this.namedWriteableRegistry = namedWriteableRegistry;
55-
this.remoteWritableEntityStores = new HashMap<>();
5648
this.remoteWritableEntityStores.put(
5749
RemoteDiscoveryNodes.DISCOVERY_NODES,
5850
new RemoteClusterStateBlobStore<>(
@@ -85,46 +77,28 @@ public class RemoteClusterStateAttributesManager {
8577
);
8678
}
8779

88-
/**
89-
* Allows async upload of Cluster State Attribute components to remote
90-
*/
91-
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
80+
@Override
81+
protected ActionListener<Void> getWrappedWriteListener(
9282
String component,
93-
AbstractRemoteWritableBlobEntity blobEntity,
94-
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
95-
) {
96-
return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener));
97-
}
98-
99-
private ActionListener<Void> getActionListener(
100-
String component,
101-
AbstractRemoteWritableBlobEntity remoteObject,
102-
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
83+
AbstractRemoteWritableBlobEntity remoteEntity,
84+
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
10385
) {
10486
return ActionListener.wrap(
105-
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
106-
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex))
87+
resp -> listener.onResponse(remoteEntity.getUploadedMetadata()),
88+
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex))
10789
);
10890
}
10991

110-
private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
111-
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
112-
if (remoteStore == null) {
113-
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
114-
}
115-
return remoteStore;
116-
}
117-
118-
public CheckedRunnable<IOException> getAsyncMetadataReadAction(
92+
@Override
93+
protected ActionListener<Object> getWrappedReadListener(
11994
String component,
120-
AbstractRemoteWritableBlobEntity blobEntity,
121-
LatchedActionListener<RemoteReadResult> listener
95+
AbstractRemoteWritableBlobEntity remoteEntity,
96+
ActionListener<RemoteReadResult> listener
12297
) {
123-
final ActionListener actionListener = ActionListener.wrap(
98+
return ActionListener.wrap(
12499
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
125-
listener::onFailure
100+
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex))
126101
);
127-
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
128102
}
129103

130104
public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
@@ -158,4 +132,5 @@ public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterSta
158132
NonDiffableValueSerializer.getAbstractInstance()
159133
);
160134
}
135+
161136
}

0 commit comments

Comments
 (0)