Skip to content

Commit 719a67d

Browse files
author
Sandeep Kumawat
committed
Fix for failur IT's
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
1 parent 37f1490 commit 719a67d

File tree

6 files changed

+238
-537
lines changed

6 files changed

+238
-537
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java

Lines changed: 135 additions & 522 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,20 @@
88

99
package org.opensearch.remotestore;
1010

11+
import org.opensearch.action.support.WriteRequest;
12+
import org.opensearch.cluster.routing.ShardRouting;
1113
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.index.shard.IndexShard;
1215
import org.opensearch.indices.replication.SegmentReplicationIT;
16+
import org.opensearch.test.InternalTestCluster;
1317
import org.opensearch.test.OpenSearchIntegTestCase;
1418
import org.junit.After;
1519
import org.junit.Before;
1620

1721
import java.nio.file.Path;
1822

23+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
24+
1925
/**
2026
* This class runs Segment Replication Integ test suite with remote store enabled.
2127
*/
@@ -40,6 +46,82 @@ protected boolean segmentReplicationWithRemoteEnabled() {
4046
return true;
4147
}
4248

49+
public void testRestartPrimary_NoReplicas() throws Exception {
50+
final String primary = internalCluster().startDataOnlyNode();
51+
createIndex(INDEX_NAME);
52+
ensureYellow(INDEX_NAME);
53+
54+
assertEquals(getNodeContainingPrimaryShard().getName(), primary);
55+
56+
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
57+
if (randomBoolean()) {
58+
flush(INDEX_NAME);
59+
} else {
60+
refresh(INDEX_NAME);
61+
}
62+
63+
internalCluster().restartNode(primary);
64+
ensureYellow(INDEX_NAME);
65+
assertDocCounts(1, primary);
66+
}
67+
68+
public void testReplicationPostDeleteAndForceMerge() throws Exception {
69+
final String primary = internalCluster().startDataOnlyNode();
70+
createIndex(INDEX_NAME);
71+
final String replica = internalCluster().startDataOnlyNode();
72+
ensureGreen(INDEX_NAME);
73+
final int initialDocCount = scaledRandomIntBetween(1, 10);
74+
for (int i = 0; i < initialDocCount; i++) {
75+
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
76+
}
77+
refresh(INDEX_NAME);
78+
waitForSearchableDocs(initialDocCount, primary, replica);
79+
80+
final int deletedDocCount = randomIntBetween(1, initialDocCount);
81+
for (int i = 0; i < deletedDocCount; i++) {
82+
client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
83+
}
84+
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
85+
86+
// randomly flush here after the force merge to wipe any old segments.
87+
if (randomBoolean()) {
88+
flush(INDEX_NAME);
89+
}
90+
91+
final IndexShard primaryShard = getIndexShard(primary, INDEX_NAME);
92+
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
93+
assertBusy(
94+
() -> assertEquals(
95+
primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
96+
replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
97+
)
98+
);
99+
100+
// add some docs to the xlog and drop primary.
101+
final int additionalDocs = randomIntBetween(1, 5);
102+
for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) {
103+
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
104+
}
105+
// Drop the primary and wait until replica is promoted.
106+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
107+
ensureYellowAndNoInitializingShards(INDEX_NAME);
108+
109+
final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica);
110+
assertNotNull(replicaShardRouting);
111+
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
112+
refresh(INDEX_NAME);
113+
final long expectedHitCount = initialDocCount + additionalDocs - deletedDocCount;
114+
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
115+
116+
int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1;
117+
assertEquals(expectedMaxSeqNo, replicaShard.seqNoStats().getMaxSeqNo());
118+
119+
// index another doc.
120+
client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get();
121+
refresh(INDEX_NAME);
122+
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1);
123+
}
124+
43125
@Before
44126
public void setup() {
45127
internalCluster().startClusterManagerOnlyNode();

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
190190
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
191191
// get a reference to the previous commit files so they can be decref'd once a new commit is made.
192192
final Collection<String> previousCommitFiles = getLastCommittedSegmentInfos().files(true);
193+
logger.info("[{}] commitSegmentInfos() [{}]", shardId, previousCommitFiles);
193194
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
194195
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
195196
// incref the latest on-disk commit.
@@ -370,7 +371,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
370371
ensureOpen();
371372
// Skip flushing for indices with partial locality (warm indices)
372373
// For these indices, we don't need to commit as we will sync from the remote store on re-open
373-
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
374+
if (engineConfig.getIndexSettings().isStoreLocalityPartial()) {
374375
return;
375376
}
376377
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
@@ -447,7 +448,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
447448
latestSegmentInfos.changed();
448449
}
449450
try {
450-
commitSegmentInfos(latestSegmentInfos);
451+
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
452+
commitSegmentInfos(latestSegmentInfos);
453+
}
451454
} catch (IOException e) {
452455
// mark the store corrupted unless we are closing as result of engine failure.
453456
// in this case Engine#failShard will handle store corruption.

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ public void updateShardState(
767767
assert newRouting.primary() && currentRouting.primary() == false;
768768
ReplicationTimer timer = new ReplicationTimer();
769769
timer.start();
770-
logger.debug(
770+
logger.info(
771771
"Resetting engine on promotion of shard [{}] to primary, startTime {}\n",
772772
shardId,
773773
timer.startTime()
@@ -5160,8 +5160,10 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
51605160
store.deleteQuiet(file);
51615161
}
51625162
}
5163-
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5164-
: "There should not be any segments file in the dir";
5163+
if (indexSettings.isStoreLocalityPartial() == false) {
5164+
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5165+
: "There should not be any segments file in the dir";
5166+
}
51655167
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
51665168
}
51675169
syncSegmentSuccess = true;

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,7 @@ public String[] listAll() throws IOException {
8686
logger.trace("Composite Directory[{}]: listAll() called", this::toString);
8787
String[] localFiles = localDirectory.listAll();
8888
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles));
89-
// String[] remoteFiles = getRemoteFiles();
90-
// allFiles.addAll(Arrays.asList(remoteFiles));
91-
logger.trace("Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles));
92-
// logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles));
89+
logger.trace("listAll Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles));
9390
Set<String> nonBlockLuceneFiles = allFiles.stream()
9491
.filter(file -> !FileTypeUtils.isBlockFile(file))
9592
.collect(Collectors.toUnmodifiableSet());
@@ -113,12 +110,16 @@ public void deleteFile(String name) throws IOException {
113110
if (FileTypeUtils.isTempFile(name)) {
114111
localDirectory.deleteFile(name);
115112
} else if (Arrays.asList(listAll()).contains(name) == false) {
116-
logger.debug("The file [{}] does not exist", name);
117-
// we should not fail here as localDirectory might not contain this file.
118-
// throw new NoSuchFileException("File " + name + " not found in directory");
113+
logger.debug("The file [{}] does not exist in local directory", name);
114+
// we should not throw exception in this case as localDirectory might not contain this file.
119115
} else {
120-
localDirectory.deleteFile(name);
121-
fileCache.remove(getFilePath(name));
116+
// It is possible that filecache doesn't have the file, but localdirectory contains the file. We will delete it from the localDirectory.
117+
if(fileCache.get(getFilePath(name)) == null) {
118+
logger.info("The file [{}] exist in local but not part of FileCache, deleting it from local", name);
119+
localDirectory.deleteFile(name);
120+
} else {
121+
fileCache.remove(getFilePath(name));
122+
}
122123
}
123124
}
124125

server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void setup() throws IOException {
6161

6262
public void testListAll() throws IOException {
6363
String[] actualFileNames = compositeDirectory.listAll();
64-
String[] expectedFileNames = new String[] { "_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1", "temp_file.tmp" };
64+
String[] expectedFileNames = new String[] { "_1.cfe", "_2.cfe", "temp_file.tmp" };
6565
assertArrayEquals(expectedFileNames, actualFileNames);
6666
}
6767

0 commit comments

Comments
 (0)