|
27 | 27 | import org.opensearch.common.blobstore.BlobPath;
|
28 | 28 | import org.opensearch.common.io.PathUtils;
|
29 | 29 | import org.opensearch.common.settings.Settings;
|
| 30 | +import org.opensearch.common.unit.TimeValue; |
30 | 31 | import org.opensearch.common.util.io.IOUtils;
|
31 | 32 | import org.opensearch.core.index.Index;
|
32 | 33 | import org.opensearch.core.rest.RestStatus;
|
33 | 34 | import org.opensearch.index.IndexService;
|
34 | 35 | import org.opensearch.index.IndexSettings;
|
| 36 | +import org.opensearch.index.mapper.MapperService; |
35 | 37 | import org.opensearch.index.remote.RemoteStoreEnums;
|
36 | 38 | import org.opensearch.index.shard.IndexShard;
|
37 | 39 | import org.opensearch.indices.IndicesService;
|
38 | 40 | import org.opensearch.indices.RemoteStoreSettings;
|
39 | 41 | import org.opensearch.indices.recovery.RecoveryState;
|
40 | 42 | import org.opensearch.indices.replication.common.ReplicationType;
|
| 43 | +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; |
41 | 44 | import org.opensearch.repositories.blobstore.BlobStoreRepository;
|
42 | 45 | import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
|
43 | 46 | import org.opensearch.snapshots.SnapshotInfo;
|
44 | 47 | import org.opensearch.snapshots.SnapshotRestoreException;
|
45 | 48 | import org.opensearch.snapshots.SnapshotState;
|
| 49 | +import org.opensearch.test.BackgroundIndexer; |
46 | 50 | import org.opensearch.test.InternalTestCluster;
|
47 | 51 | import org.opensearch.test.OpenSearchIntegTestCase;
|
48 | 52 | import org.junit.After;
|
|
53 | 57 | import java.nio.file.Path;
|
54 | 58 | import java.util.ArrayList;
|
55 | 59 | import java.util.Arrays;
|
| 60 | +import java.util.HashMap; |
56 | 61 | import java.util.List;
|
57 | 62 | import java.util.Map;
|
58 | 63 | import java.util.Objects;
|
59 | 64 | import java.util.Optional;
|
60 | 65 | import java.util.concurrent.ExecutionException;
|
| 66 | +import java.util.concurrent.TimeUnit; |
61 | 67 | import java.util.stream.Collectors;
|
62 | 68 | import java.util.stream.Stream;
|
63 | 69 |
|
64 | 70 | import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
|
| 71 | +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; |
65 | 72 | import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
|
66 | 73 | import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
|
67 | 74 | import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
|
@@ -885,4 +892,88 @@ public void testRestoreOperationsUsingDifferentRepos() throws Exception {
|
885 | 892 | ensureGreen(indexName1);
|
886 | 893 | assertDocsPresentInIndex(client, indexName1, 3 * numDocsInIndex1);
|
887 | 894 | }
|
| 895 | + |
| 896 | + public void testContinuousIndexing() throws Exception { |
| 897 | + internalCluster().startClusterManagerOnlyNode(); |
| 898 | + internalCluster().startDataOnlyNode(); |
| 899 | + String index = "test-index"; |
| 900 | + String snapshotRepo = "test-restore-snapshot-repo"; |
| 901 | + String baseSnapshotName = "snapshot_"; |
| 902 | + Path absolutePath1 = randomRepoPath().toAbsolutePath(); |
| 903 | + logger.info("Snapshot Path [{}]", absolutePath1); |
| 904 | + |
| 905 | + createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true)); |
| 906 | + |
| 907 | + Client client = client(); |
| 908 | + Settings indexSettings = Settings.builder() |
| 909 | + .put(super.indexSettings()) |
| 910 | + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) |
| 911 | + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) |
| 912 | + .build(); |
| 913 | + |
| 914 | + createIndex(index, indexSettings); |
| 915 | + ensureGreen(index); |
| 916 | + |
| 917 | + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( |
| 918 | + RemoteStorePinnedTimestampService.class, |
| 919 | + primaryNodeName(index) |
| 920 | + ); |
| 921 | + |
| 922 | + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(randomIntBetween(1, 5))); |
| 923 | + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(randomIntBetween(1, 5))); |
| 924 | + |
| 925 | + long totalDocs = 0; |
| 926 | + Map<String, Long> snapshots = new HashMap<>(); |
| 927 | + int numDocs = randomIntBetween(200, 300); |
| 928 | + totalDocs += numDocs; |
| 929 | + try (BackgroundIndexer indexer = new BackgroundIndexer(index, MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) { |
| 930 | + int numberOfSnapshots = 5; |
| 931 | + for (int i = 0; i < numberOfSnapshots; i++) { |
| 932 | + logger.info("--> waiting for {} docs to be indexed ...", numDocs); |
| 933 | + long finalTotalDocs1 = totalDocs; |
| 934 | + assertBusy(() -> assertEquals(finalTotalDocs1, indexer.totalIndexedDocs()), 120, TimeUnit.SECONDS); |
| 935 | + logger.info("--> {} total docs indexed", totalDocs); |
| 936 | + String snapshotName = baseSnapshotName + i; |
| 937 | + createSnapshot(snapshotRepo, snapshotName, new ArrayList<>()); |
| 938 | + snapshots.put(snapshotName, totalDocs); |
| 939 | + if (i < numberOfSnapshots - 1) { |
| 940 | + numDocs = randomIntBetween(200, 300); |
| 941 | + indexer.continueIndexing(numDocs); |
| 942 | + totalDocs += numDocs; |
| 943 | + } |
| 944 | + } |
| 945 | + } |
| 946 | + |
| 947 | + logger.info("Snapshots Status: " + snapshots); |
| 948 | + |
| 949 | + for (String snapshot : snapshots.keySet()) { |
| 950 | + logger.info("Restoring snapshot: {}", snapshot); |
| 951 | + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(index)).get()); |
| 952 | + |
| 953 | + RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin() |
| 954 | + .cluster() |
| 955 | + .prepareRestoreSnapshot(snapshotRepo, snapshot) |
| 956 | + .setWaitForCompletion(true) |
| 957 | + .setIndices() |
| 958 | + .get(); |
| 959 | + |
| 960 | + assertEquals(RestStatus.OK, restoreSnapshotResponse1.status()); |
| 961 | + |
| 962 | + // Verify restored index's stats |
| 963 | + ensureGreen(TimeValue.timeValueSeconds(60), index); |
| 964 | + long finalTotalDocs = totalDocs; |
| 965 | + assertBusy(() -> { |
| 966 | + Long hits = client().prepareSearch(index) |
| 967 | + .setQuery(matchAllQuery()) |
| 968 | + .setSize((int) finalTotalDocs) |
| 969 | + .storedFields() |
| 970 | + .execute() |
| 971 | + .actionGet() |
| 972 | + .getHits() |
| 973 | + .getTotalHits().value; |
| 974 | + |
| 975 | + assertEquals(snapshots.get(snapshot), hits); |
| 976 | + }); |
| 977 | + } |
| 978 | + } |
888 | 979 | }
|
0 commit comments