|
12 | 12 | import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
|
13 | 13 | import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
14 | 14 | import org.opensearch.common.settings.Settings;
|
| 15 | +import org.opensearch.common.unit.TimeValue; |
| 16 | +import org.opensearch.common.util.concurrent.AbstractAsyncTask; |
| 17 | +import org.opensearch.common.util.concurrent.UncategorizedExecutionException; |
15 | 18 | import org.opensearch.core.common.bytes.BytesArray;
|
16 | 19 | import org.opensearch.core.common.bytes.BytesReference;
|
17 | 20 | import org.opensearch.core.common.unit.ByteSizeUnit;
|
18 | 21 | import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
|
19 | 22 | import org.opensearch.core.xcontent.MediaTypeRegistry;
|
| 23 | +import org.opensearch.index.IndexService; |
20 | 24 | import org.opensearch.index.remote.RemoteSegmentTransferTracker;
|
| 25 | +import org.opensearch.index.shard.IndexShard; |
| 26 | +import org.opensearch.indices.IndicesService; |
21 | 27 | import org.opensearch.repositories.RepositoriesService;
|
22 | 28 | import org.opensearch.snapshots.mockstore.MockRepository;
|
23 | 29 | import org.opensearch.test.OpenSearchIntegTestCase;
|
|
33 | 39 | import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
|
34 | 40 |
|
35 | 41 | @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
|
36 |
| -public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { |
| 42 | +public class RemoteStoreBackpressureAndResiliencyIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { |
37 | 43 | public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
|
38 | 44 | // Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions
|
39 | 45 | // fail leading to consecutive failure limit getting exceeded and leading to rejections.
|
@@ -156,4 +162,70 @@ private String generateString(int sizeInBytes) {
|
156 | 162 | sb.append("}");
|
157 | 163 | return sb.toString();
|
158 | 164 | }
|
| 165 | + |
| 166 | + /** |
| 167 | + * Fixes <a href="https://github.com/opensearch-project/OpenSearch/issues/10398">Github#10398</a> |
| 168 | + */ |
| 169 | + public void testAsyncTrimTaskSucceeds() { |
| 170 | + Path location = randomRepoPath().toAbsolutePath(); |
| 171 | + String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE); |
| 172 | + |
| 173 | + logger.info("Increasing the frequency of async trim task to ensure it runs in background while indexing"); |
| 174 | + IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); |
| 175 | + ((AbstractAsyncTask) indexService.getTrimTranslogTask()).setInterval(TimeValue.timeValueMillis(100)); |
| 176 | + |
| 177 | + logger.info("--> Indexing data"); |
| 178 | + indexData(randomIntBetween(2, 5), true); |
| 179 | + logger.info("--> Indexing succeeded"); |
| 180 | + |
| 181 | + MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName) |
| 182 | + .repository(TRANSLOG_REPOSITORY_NAME); |
| 183 | + logger.info("--> Failing all remote store interaction"); |
| 184 | + translogRepo.setRandomControlIOExceptionRate(1d); |
| 185 | + |
| 186 | + for (int i = 0; i < randomIntBetween(5, 10); i++) { |
| 187 | + UncategorizedExecutionException exception = assertThrows(UncategorizedExecutionException.class, this::indexSingleDoc); |
| 188 | + assertEquals("Failed execution", exception.getMessage()); |
| 189 | + } |
| 190 | + |
| 191 | + translogRepo.setRandomControlIOExceptionRate(0d); |
| 192 | + indexSingleDoc(); |
| 193 | + logger.info("Indexed single doc successfully"); |
| 194 | + } |
| 195 | + |
| 196 | + /** |
| 197 | + * Fixes <a href="https://github.com/opensearch-project/OpenSearch/issues/10400">Github#10400</a> |
| 198 | + */ |
| 199 | + public void testSkipLoadGlobalCheckpointToReplicationTracker() { |
| 200 | + Path location = randomRepoPath().toAbsolutePath(); |
| 201 | + String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE); |
| 202 | + |
| 203 | + logger.info("--> Indexing data"); |
| 204 | + indexData(randomIntBetween(1, 2), true); |
| 205 | + logger.info("--> Indexing succeeded"); |
| 206 | + |
| 207 | + IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); |
| 208 | + IndexShard indexShard = indexService.getShard(0); |
| 209 | + indexShard.failShard("failing shard", null); |
| 210 | + |
| 211 | + ensureRed(INDEX_NAME); |
| 212 | + |
| 213 | + MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName) |
| 214 | + .repository(TRANSLOG_REPOSITORY_NAME); |
| 215 | + logger.info("--> Failing all remote store interaction"); |
| 216 | + translogRepo.setRandomControlIOExceptionRate(1d); |
| 217 | + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); |
| 218 | + // CLuster stays red still as the remote interactions are still failing |
| 219 | + ensureRed(INDEX_NAME); |
| 220 | + |
| 221 | + logger.info("Retrying to allocate failed shards"); |
| 222 | + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); |
| 223 | + // CLuster stays red still as the remote interactions are still failing |
| 224 | + ensureRed(INDEX_NAME); |
| 225 | + |
| 226 | + logger.info("Stop failing all remote store interactions"); |
| 227 | + translogRepo.setRandomControlIOExceptionRate(0d); |
| 228 | + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); |
| 229 | + ensureGreen(INDEX_NAME); |
| 230 | + } |
159 | 231 | }
|
0 commit comments