3232
3333package org .opensearch .repositories ;
3434
35+ import org .apache .lucene .store .RateLimiter ;
3536import org .opensearch .action .admin .cluster .repositories .get .GetRepositoriesResponse ;
3637import org .opensearch .cluster .metadata .RepositoryMetadata ;
3738import org .opensearch .common .settings .Settings ;
4243import org .opensearch .test .OpenSearchIntegTestCase ;
4344import org .opensearch .transport .client .Client ;
4445
46+ import java .nio .file .Path ;
4547import java .util .Collection ;
4648import java .util .Collections ;
4749import java .util .concurrent .atomic .AtomicInteger ;
4850
4951import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
5052import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_SHARDS ;
53+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC ;
54+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC ;
55+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_REMOTE_UPLOAD_BYTES_PER_SEC ;
56+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_RESTORE_BYTES_PER_SEC ;
57+ import static org .opensearch .repositories .blobstore .BlobStoreRepository .MAX_SNAPSHOT_BYTES_PER_SEC ;
5158import static org .hamcrest .Matchers .containsString ;
5259import static org .hamcrest .Matchers .equalTo ;
5360import static org .hamcrest .Matchers .hasSize ;
@@ -138,10 +145,11 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
138145
139146 // create repository
140147 final String repositoryName = "test-repo" ;
148+ Path path = randomRepoPath ();
141149 Settings .Builder repoSettings = Settings .builder ()
142- .put ("location" , randomRepoPath () )
143- .put ("max_snapshot_bytes_per_sec" , "10mb" )
144- .put ("max_restore_bytes_per_sec" , "10mb" );
150+ .put ("location" , path )
151+ .put (MAX_SNAPSHOT_BYTES_PER_SEC , "10mb" )
152+ .put (MAX_RESTORE_BYTES_PER_SEC , "10mb" );
145153 OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
146154 client ().admin ().cluster (),
147155 repositoryName ,
@@ -176,7 +184,7 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
176184
177185 try {
178186 logger .info ("--> begin to reset repository" );
179- repoSettings = Settings .builder ().put ("location" , randomRepoPath ()).put ("max_snapshot_bytes_per_sec" , "300mb" );
187+ repoSettings = Settings .builder ().put ("location" , randomRepoPath ()).put (MAX_SNAPSHOT_BYTES_PER_SEC , "300mb" );
180188 OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
181189 client ().admin ().cluster (),
182190 repositoryName ,
@@ -194,4 +202,121 @@ public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws Interrupted
194202
195203 thread .join ();
196204 }
205+
206+ public void testAdjustBytesPerSecSettingForSnapAndRestore () {
207+ final InternalTestCluster cluster = internalCluster ();
208+ final RepositoriesService repositoriesService = cluster .getDataOrClusterManagerNodeInstances (RepositoriesService .class )
209+ .iterator ()
210+ .next ();
211+
212+ // create repository
213+ final String repositoryName = "test-repo1" ;
214+ long rateBytes = 200000 ;
215+ Path path = randomRepoPath ();
216+ Settings .Builder repoSettings = Settings .builder ()
217+ .put ("location" , path )
218+ .put (MAX_SNAPSHOT_BYTES_PER_SEC , (rateBytes + "b" ))
219+ .put (MAX_RESTORE_BYTES_PER_SEC , (rateBytes + "b" ))
220+ .put (MAX_REMOTE_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
221+ .put (MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
222+ .put (MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC , (rateBytes + "b" ));
223+ OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
224+ client ().admin ().cluster (),
225+ repositoryName ,
226+ FsRepository .TYPE ,
227+ true ,
228+ repoSettings
229+ );
230+
231+ FsRepository repository = (FsRepository ) repositoriesService .repository (repositoryName );
232+ RateLimiter snapshotRateLimiter = repository .snapshotRateLimiter ();
233+ assertThat (snapshotRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
234+ RateLimiter restoreRateLimiter = repository .restoreRateLimiter ();
235+ assertThat (restoreRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
236+ RateLimiter remoteUploadRateLimiter = repository .remoteUploadRateLimiter ();
237+ assertThat (remoteUploadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
238+ RateLimiter remoteUploadLowPriorityRateLimiter = repository .remoteUploadLowPriorityRateLimiter ();
239+ assertThat (remoteUploadLowPriorityRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
240+ RateLimiter remoteDownloadRateLimiter = repository .remoteDownloadRateLimiter ();
241+ assertThat (remoteDownloadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
242+
243+ // adjust all the reloadable settings
244+ {
245+ rateBytes = rateBytes / 2 ;
246+ repoSettings = Settings .builder ()
247+ .put (MAX_SNAPSHOT_BYTES_PER_SEC , (rateBytes + "b" ))
248+ .put (MAX_RESTORE_BYTES_PER_SEC , (rateBytes + "b" ))
249+ .put (MAX_REMOTE_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
250+ .put (MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
251+ .put (MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC , (rateBytes + "b" ));
252+ OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
253+ client ().admin ().cluster (),
254+ repositoryName ,
255+ FsRepository .TYPE ,
256+ true ,
257+ repoSettings
258+ );
259+ FsRepository newRepository = (FsRepository ) repositoriesService .repository (repositoryName );
260+ assertThat (newRepository , sameInstance (repository ));
261+ snapshotRateLimiter = newRepository .snapshotRateLimiter ();
262+ assertThat (snapshotRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
263+ restoreRateLimiter = newRepository .restoreRateLimiter ();
264+ assertThat (restoreRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
265+ remoteUploadRateLimiter = newRepository .remoteUploadRateLimiter ();
266+ assertThat (remoteUploadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
267+ remoteUploadLowPriorityRateLimiter = newRepository .remoteUploadLowPriorityRateLimiter ();
268+ assertThat (remoteUploadLowPriorityRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
269+ remoteDownloadRateLimiter = newRepository .remoteDownloadRateLimiter ();
270+ assertThat (remoteDownloadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
271+ }
272+
273+ // In addition to the settings in RELOADABLE_SETTINGS, all the new settings should be equal to current settings
274+ {
275+ long newRateBytes = rateBytes / 2 ;
276+ repoSettings = Settings .builder ()
277+ .put ("location" , path )
278+ .put (MAX_SNAPSHOT_BYTES_PER_SEC , (newRateBytes + "b" ))
279+ .put (MAX_RESTORE_BYTES_PER_SEC , (newRateBytes + "b" ));
280+ OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
281+ client ().admin ().cluster (),
282+ repositoryName ,
283+ FsRepository .TYPE ,
284+ true ,
285+ repoSettings
286+ );
287+ FsRepository newRepository = (FsRepository ) repositoriesService .repository (repositoryName );
288+ assertThat (newRepository , sameInstance (repository ));
289+ snapshotRateLimiter = newRepository .snapshotRateLimiter ();
290+ assertThat (snapshotRateLimiter .getMBPerSec (), equalTo ((double ) newRateBytes / (1024 * 1024 )));
291+ restoreRateLimiter = newRepository .restoreRateLimiter ();
292+ assertThat (restoreRateLimiter .getMBPerSec (), equalTo ((double ) newRateBytes / (1024 * 1024 )));
293+ remoteUploadRateLimiter = newRepository .remoteUploadRateLimiter ();
294+ assertThat (remoteUploadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
295+ remoteUploadLowPriorityRateLimiter = newRepository .remoteUploadLowPriorityRateLimiter ();
296+ assertThat (remoteUploadLowPriorityRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
297+ remoteDownloadRateLimiter = newRepository .remoteDownloadRateLimiter ();
298+ assertThat (remoteDownloadRateLimiter .getMBPerSec (), equalTo ((double ) rateBytes / (1024 * 1024 )));
299+ }
300+
301+ // the new settings are not all equal to the old settings, so the repository will be not reloaded
302+ {
303+ rateBytes = rateBytes / 2 ;
304+ repoSettings = Settings .builder ()
305+ .put ("location" , path )
306+ .put ("io_buffer_size" , "8mb" )
307+ .put (MAX_RESTORE_BYTES_PER_SEC , (rateBytes + "b" ))
308+ .put (MAX_REMOTE_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
309+ .put (MAX_REMOTE_LOW_PRIORITY_UPLOAD_BYTES_PER_SEC , (rateBytes + "b" ))
310+ .put (MAX_REMOTE_DOWNLOAD_BYTES_PER_SEC , (rateBytes + "b" ));
311+ OpenSearchIntegTestCase .putRepositoryWithNoSettingOverrides (
312+ client ().admin ().cluster (),
313+ repositoryName ,
314+ FsRepository .TYPE ,
315+ true ,
316+ repoSettings
317+ );
318+ FsRepository newRepository = (FsRepository ) repositoriesService .repository (repositoryName );
319+ assertNotEquals (newRepository , repository );
320+ }
321+ }
197322}
0 commit comments