|
| 1 | +/* |
| 2 | + * SPDX-License-Identifier: Apache-2.0 |
| 3 | + * |
| 4 | + * The OpenSearch Contributors require contributions made to |
| 5 | + * this file be licensed under the Apache-2.0 license or a |
| 6 | + * compatible open source license. |
| 7 | + */ |
| 8 | + |
| 9 | +package org.opensearch.cluster.routing.remote; |
| 10 | + |
| 11 | +import org.apache.logging.log4j.LogManager; |
| 12 | +import org.apache.logging.log4j.Logger; |
| 13 | +import org.apache.lucene.store.IndexInput; |
| 14 | +import org.opensearch.action.LatchedActionListener; |
| 15 | +import org.opensearch.cluster.ClusterState; |
| 16 | +import org.opensearch.cluster.DiffableUtils; |
| 17 | +import org.opensearch.cluster.routing.IndexRoutingTable; |
| 18 | +import org.opensearch.cluster.routing.RoutingTable; |
| 19 | +import org.opensearch.common.CheckedRunnable; |
| 20 | +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; |
| 21 | +import org.opensearch.common.blobstore.BlobContainer; |
| 22 | +import org.opensearch.common.blobstore.BlobPath; |
| 23 | +import org.opensearch.common.blobstore.stream.write.WritePriority; |
| 24 | +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; |
| 25 | +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; |
| 26 | +import org.opensearch.common.io.stream.BytesStreamOutput; |
| 27 | +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; |
| 28 | +import org.opensearch.common.lucene.store.ByteArrayIndexInput; |
| 29 | +import org.opensearch.common.settings.ClusterSettings; |
| 30 | +import org.opensearch.common.settings.Setting; |
| 31 | +import org.opensearch.common.settings.Settings; |
| 32 | +import org.opensearch.common.util.io.IOUtils; |
| 33 | +import org.opensearch.core.action.ActionListener; |
| 34 | +import org.opensearch.core.common.bytes.BytesReference; |
| 35 | +import org.opensearch.gateway.remote.ClusterMetadataManifest; |
| 36 | +import org.opensearch.gateway.remote.RemoteClusterStateService; |
| 37 | +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; |
| 38 | +import org.opensearch.index.remote.RemoteStoreEnums; |
| 39 | +import org.opensearch.index.remote.RemoteStorePathStrategy; |
| 40 | +import org.opensearch.index.remote.RemoteStoreUtils; |
| 41 | +import org.opensearch.node.Node; |
| 42 | +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; |
| 43 | +import org.opensearch.repositories.RepositoriesService; |
| 44 | +import org.opensearch.repositories.Repository; |
| 45 | +import org.opensearch.repositories.blobstore.BlobStoreRepository; |
| 46 | + |
| 47 | +import java.io.IOException; |
| 48 | +import java.util.ArrayList; |
| 49 | +import java.util.List; |
| 50 | +import java.util.Map; |
| 51 | +import java.util.function.Function; |
| 52 | +import java.util.function.Supplier; |
| 53 | +import java.util.stream.Collectors; |
| 54 | + |
| 55 | +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; |
| 56 | + |
| 57 | +/** |
| 58 | + * A Service which provides APIs to upload and download routing table from remote store. |
| 59 | + * |
| 60 | + * @opensearch.internal |
| 61 | + */ |
| 62 | +public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService { |
| 63 | + |
| 64 | + /** |
| 65 | + * This setting is used to set the remote routing table store blob store path type strategy. |
| 66 | + */ |
| 67 | + public static final Setting<RemoteStoreEnums.PathType> REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>( |
| 68 | + "cluster.remote_store.routing_table.path_type", |
| 69 | + RemoteStoreEnums.PathType.HASHED_PREFIX.toString(), |
| 70 | + RemoteStoreEnums.PathType::parseString, |
| 71 | + Setting.Property.NodeScope, |
| 72 | + Setting.Property.Dynamic |
| 73 | + ); |
| 74 | + |
| 75 | + /** |
| 76 | + * This setting is used to set the remote routing table store blob store path hash algorithm strategy. |
| 77 | + * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING} |
| 78 | + * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}. |
| 79 | + */ |
| 80 | + public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>( |
| 81 | + "cluster.remote_store.routing_table.path_hash_algo", |
| 82 | + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(), |
| 83 | + RemoteStoreEnums.PathHashAlgorithm::parseString, |
| 84 | + Setting.Property.NodeScope, |
| 85 | + Setting.Property.Dynamic |
| 86 | + ); |
| 87 | + |
| 88 | + public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; |
| 89 | + public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; |
| 90 | + public static final String DELIMITER = "__"; |
| 91 | + public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; |
| 92 | + |
| 93 | + private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); |
| 94 | + private final Settings settings; |
| 95 | + private final Supplier<RepositoriesService> repositoriesService; |
| 96 | + private BlobStoreRepository blobStoreRepository; |
| 97 | + private RemoteStoreEnums.PathType pathType; |
| 98 | + private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo; |
| 99 | + |
| 100 | + public InternalRemoteRoutingTableService( |
| 101 | + Supplier<RepositoriesService> repositoriesService, |
| 102 | + Settings settings, |
| 103 | + ClusterSettings clusterSettings |
| 104 | + ) { |
| 105 | + assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; |
| 106 | + this.repositoriesService = repositoriesService; |
| 107 | + this.settings = settings; |
| 108 | + this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING); |
| 109 | + this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING); |
| 110 | + clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting); |
| 111 | + clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); |
| 112 | + } |
| 113 | + |
| 114 | + private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) { |
| 115 | + this.pathType = pathType; |
| 116 | + } |
| 117 | + |
| 118 | + private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) { |
| 119 | + this.pathHashAlgo = pathHashAlgo; |
| 120 | + } |
| 121 | + |
| 122 | + public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) { |
| 123 | + return new ArrayList<>(routingTable.indicesRouting().values()); |
| 124 | + } |
| 125 | + |
| 126 | + /** |
| 127 | + * Returns diff between the two routing tables, which includes upserts and deletes. |
| 128 | + * @param before previous routing table |
| 129 | + * @param after current routing table |
| 130 | + * @return diff of the previous and current routing table |
| 131 | + */ |
| 132 | + public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff( |
| 133 | + RoutingTable before, |
| 134 | + RoutingTable after |
| 135 | + ) { |
| 136 | + return DiffableUtils.diff( |
| 137 | + before.getIndicesRouting(), |
| 138 | + after.getIndicesRouting(), |
| 139 | + DiffableUtils.getStringKeySerializer(), |
| 140 | + CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER |
| 141 | + ); |
| 142 | + } |
| 143 | + |
| 144 | + /** |
| 145 | + * Create async action for writing one {@code IndexRoutingTable} to remote store |
| 146 | + * @param clusterState current cluster state |
| 147 | + * @param indexRouting indexRoutingTable to write to remote store |
| 148 | + * @param latchedActionListener listener for handling async action response |
| 149 | + * @param clusterBasePath base path for remote file |
| 150 | + * @return returns runnable async action |
| 151 | + */ |
| 152 | + public CheckedRunnable<IOException> getIndexRoutingAsyncAction( |
| 153 | + ClusterState clusterState, |
| 154 | + IndexRoutingTable indexRouting, |
| 155 | + LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener, |
| 156 | + BlobPath clusterBasePath |
| 157 | + ) { |
| 158 | + |
| 159 | + BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN); |
| 160 | + BlobPath path = pathType.path( |
| 161 | + RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(), |
| 162 | + pathHashAlgo |
| 163 | + ); |
| 164 | + final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path); |
| 165 | + |
| 166 | + final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version()); |
| 167 | + |
| 168 | + ActionListener<Void> completionListener = ActionListener.wrap( |
| 169 | + resp -> latchedActionListener.onResponse( |
| 170 | + new ClusterMetadataManifest.UploadedIndexMetadata( |
| 171 | + indexRouting.getIndex().getName(), |
| 172 | + indexRouting.getIndex().getUUID(), |
| 173 | + path.buildAsString() + fileName, |
| 174 | + INDEX_ROUTING_METADATA_PREFIX |
| 175 | + ) |
| 176 | + ), |
| 177 | + ex -> latchedActionListener.onFailure( |
| 178 | + new RemoteClusterStateService.RemoteStateTransferException( |
| 179 | + "Exception in writing index to remote store: " + indexRouting.getIndex().toString(), |
| 180 | + ex |
| 181 | + ) |
| 182 | + ) |
| 183 | + ); |
| 184 | + |
| 185 | + return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener); |
| 186 | + } |
| 187 | + |
| 188 | + /** |
| 189 | + * Combines IndicesRoutingMetadata from previous manifest and current uploaded indices, removes deleted indices. |
| 190 | + * @param previousManifest previous manifest, used to get all existing indices routing paths |
| 191 | + * @param indicesRoutingUploaded current uploaded indices routings |
| 192 | + * @param indicesRoutingToDelete indices to delete |
| 193 | + * @return combined list of metadata |
| 194 | + */ |
| 195 | + public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting( |
| 196 | + ClusterMetadataManifest previousManifest, |
| 197 | + List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingUploaded, |
| 198 | + List<String> indicesRoutingToDelete |
| 199 | + ) { |
| 200 | + final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting() |
| 201 | + .stream() |
| 202 | + .collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity())); |
| 203 | + |
| 204 | + indicesRoutingUploaded.forEach( |
| 205 | + uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting) |
| 206 | + ); |
| 207 | + indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove); |
| 208 | + |
| 209 | + return new ArrayList<>(allUploadedIndicesRouting.values()); |
| 210 | + } |
| 211 | + |
| 212 | + private void uploadIndex( |
| 213 | + IndexRoutingTable indexRouting, |
| 214 | + String fileName, |
| 215 | + BlobContainer blobContainer, |
| 216 | + ActionListener<Void> completionListener |
| 217 | + ) { |
| 218 | + RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting); |
| 219 | + BytesReference bytesInput = null; |
| 220 | + try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { |
| 221 | + indexRoutingInput.writeTo(streamOutput); |
| 222 | + bytesInput = streamOutput.bytes(); |
| 223 | + } catch (IOException e) { |
| 224 | + logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e); |
| 225 | + completionListener.onFailure(e); |
| 226 | + return; |
| 227 | + } |
| 228 | + |
| 229 | + if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { |
| 230 | + try { |
| 231 | + blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true); |
| 232 | + completionListener.onResponse(null); |
| 233 | + } catch (IOException e) { |
| 234 | + logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e); |
| 235 | + completionListener.onFailure(e); |
| 236 | + } |
| 237 | + return; |
| 238 | + } |
| 239 | + |
| 240 | + try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) { |
| 241 | + try ( |
| 242 | + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( |
| 243 | + fileName, |
| 244 | + fileName, |
| 245 | + input.length(), |
| 246 | + true, |
| 247 | + WritePriority.URGENT, |
| 248 | + (size, position) -> new OffsetRangeIndexInputStream(input, size, position), |
| 249 | + null, |
| 250 | + false |
| 251 | + ) |
| 252 | + ) { |
| 253 | + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload( |
| 254 | + remoteTransferContainer.createWriteContext(), |
| 255 | + completionListener |
| 256 | + ); |
| 257 | + } catch (IOException e) { |
| 258 | + logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e); |
| 259 | + completionListener.onFailure(e); |
| 260 | + } |
| 261 | + } catch (IOException e) { |
| 262 | + logger.error( |
| 263 | + "Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]", |
| 264 | + indexRouting, |
| 265 | + e |
| 266 | + ); |
| 267 | + completionListener.onFailure(e); |
| 268 | + } |
| 269 | + } |
| 270 | + |
| 271 | + private String getIndexRoutingFileName(long term, long version) { |
| 272 | + return String.join( |
| 273 | + DELIMITER, |
| 274 | + INDEX_ROUTING_FILE_PREFIX, |
| 275 | + RemoteStoreUtils.invertLong(term), |
| 276 | + RemoteStoreUtils.invertLong(version), |
| 277 | + RemoteStoreUtils.invertLong(System.currentTimeMillis()) |
| 278 | + ); |
| 279 | + } |
| 280 | + |
| 281 | + @Override |
| 282 | + protected void doClose() throws IOException { |
| 283 | + if (blobStoreRepository != null) { |
| 284 | + IOUtils.close(blobStoreRepository); |
| 285 | + } |
| 286 | + } |
| 287 | + |
| 288 | + @Override |
| 289 | + protected void doStart() { |
| 290 | + assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled"; |
| 291 | + final String remoteStoreRepo = settings.get( |
| 292 | + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY |
| 293 | + ); |
| 294 | + assert remoteStoreRepo != null : "Remote routing table repository is not configured"; |
| 295 | + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); |
| 296 | + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; |
| 297 | + blobStoreRepository = (BlobStoreRepository) repository; |
| 298 | + } |
| 299 | + |
| 300 | + @Override |
| 301 | + protected void doStop() {} |
| 302 | + |
| 303 | +} |
0 commit comments