Skip to content

Commit dac7cf1

Browse files
Rishikesh1159kaushalmahi12
authored andcommitted
[Segment Replication] Remove Doc Parsing for segment replication enabled replica shard during translog replay from recovery (opensearch-project#9002)
* Remove Doc Parsing for segment replication enabled replica shard during translog replay from recovery. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Adding unit test to verify document is not parsed on an segment replication enabled replica shard. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * remove unnecessary unit tests and address comments. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * address comments on PR. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> --------- Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
1 parent 55a12dc commit dac7cf1

File tree

2 files changed

+104
-21
lines changed

2 files changed

+104
-21
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,14 @@
9191
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
9292
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
9393
import static org.opensearch.index.query.QueryBuilders.matchQuery;
94+
import static org.opensearch.index.query.QueryBuilders.termQuery;
95+
import static org.opensearch.index.query.QueryBuilders.boolQuery;
96+
import static org.opensearch.index.query.QueryBuilders.rangeQuery;
9497
import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX;
9598
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
9699
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
97100
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
101+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
98102
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;
99103

100104
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@@ -1349,4 +1353,76 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
13491353
ensureGreen(INDEX_NAME);
13501354
waitForSearchableDocs(2, nodes);
13511355
}
1356+
1357+
public void testIndexWhileRecoveringReplica() throws Exception {
1358+
final String primaryNode = internalCluster().startDataOnlyNode();
1359+
assertAcked(
1360+
prepareCreate(INDEX_NAME).setMapping(
1361+
jsonBuilder().startObject()
1362+
.startObject("_routing")
1363+
.field("required", true)
1364+
.endObject()
1365+
.startObject("properties")
1366+
.startObject("online")
1367+
.field("type", "boolean")
1368+
.endObject()
1369+
.startObject("ts")
1370+
.field("type", "date")
1371+
.field("ignore_malformed", false)
1372+
.field("format", "epoch_millis")
1373+
.endObject()
1374+
.startObject("bs")
1375+
.field("type", "keyword")
1376+
.endObject()
1377+
.endObject()
1378+
.endObject()
1379+
)
1380+
);
1381+
ensureYellow(INDEX_NAME);
1382+
final String replicaNode = internalCluster().startDataOnlyNode();
1383+
1384+
client().prepareIndex(INDEX_NAME)
1385+
.setId("1")
1386+
.setRouting("Y")
1387+
.setSource("online", false, "bs", "Y", "ts", System.currentTimeMillis() - 100, "type", "s")
1388+
.get();
1389+
client().prepareIndex(INDEX_NAME)
1390+
.setId("2")
1391+
.setRouting("X")
1392+
.setSource("online", true, "bs", "X", "ts", System.currentTimeMillis() - 10000000, "type", "s")
1393+
.get();
1394+
client().prepareIndex(INDEX_NAME)
1395+
.setId("3")
1396+
.setRouting(randomAlphaOfLength(2))
1397+
.setSource("online", false, "ts", System.currentTimeMillis() - 100, "type", "bs")
1398+
.get();
1399+
client().prepareIndex(INDEX_NAME)
1400+
.setId("4")
1401+
.setRouting(randomAlphaOfLength(2))
1402+
.setSource("online", true, "ts", System.currentTimeMillis() - 123123, "type", "bs")
1403+
.get();
1404+
refresh();
1405+
ensureGreen(INDEX_NAME);
1406+
waitForSearchableDocs(4, primaryNode, replicaNode);
1407+
1408+
SearchResponse response = client().prepareSearch(INDEX_NAME)
1409+
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
1410+
.setQuery(
1411+
boolQuery().must(termQuery("online", true))
1412+
.must(
1413+
boolQuery().should(
1414+
boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "bs"))
1415+
)
1416+
.should(
1417+
boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "s"))
1418+
)
1419+
)
1420+
)
1421+
.setVersion(true)
1422+
.setFrom(0)
1423+
.setSize(100)
1424+
.setExplain(true)
1425+
.get();
1426+
assertNoFailures(response);
1427+
}
13521428
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -940,7 +940,8 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
940940
autoGeneratedTimestamp,
941941
isRetry,
942942
Engine.Operation.Origin.PRIMARY,
943-
sourceToParse
943+
sourceToParse,
944+
null
944945
);
945946
}
946947

@@ -953,23 +954,6 @@ public Engine.IndexResult applyIndexOperationOnReplica(
953954
boolean isRetry,
954955
SourceToParse sourceToParse
955956
) throws IOException {
956-
if (indexSettings.isSegRepEnabled()) {
957-
Engine.Index index = new Engine.Index(
958-
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
959-
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
960-
seqNo,
961-
opPrimaryTerm,
962-
version,
963-
null,
964-
Engine.Operation.Origin.REPLICA,
965-
System.nanoTime(),
966-
autoGeneratedTimeStamp,
967-
isRetry,
968-
UNASSIGNED_SEQ_NO,
969-
0
970-
);
971-
return getEngine().index(index);
972-
}
973957
return applyIndexOperation(
974958
getEngine(),
975959
seqNo,
@@ -981,7 +965,8 @@ public Engine.IndexResult applyIndexOperationOnReplica(
981965
autoGeneratedTimeStamp,
982966
isRetry,
983967
Engine.Operation.Origin.REPLICA,
984-
sourceToParse
968+
sourceToParse,
969+
id
985970
);
986971
}
987972

@@ -996,8 +981,29 @@ private Engine.IndexResult applyIndexOperation(
996981
long autoGeneratedTimeStamp,
997982
boolean isRetry,
998983
Engine.Operation.Origin origin,
999-
SourceToParse sourceToParse
984+
SourceToParse sourceToParse,
985+
String id
1000986
) throws IOException {
987+
988+
// For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary
989+
// shard.
990+
if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) {
991+
Engine.Index index = new Engine.Index(
992+
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
993+
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
994+
seqNo,
995+
opPrimaryTerm,
996+
version,
997+
null,
998+
Engine.Operation.Origin.REPLICA,
999+
System.nanoTime(),
1000+
autoGeneratedTimeStamp,
1001+
isRetry,
1002+
UNASSIGNED_SEQ_NO,
1003+
0
1004+
);
1005+
return getEngine().index(index);
1006+
}
10011007
assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ "
10021008
+ opPrimaryTerm
10031009
+ " ] > shard term ["
@@ -2164,7 +2170,8 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
21642170
index.source(),
21652171
MediaTypeRegistry.xContentType(index.source()),
21662172
index.routing()
2167-
)
2173+
),
2174+
index.id()
21682175
);
21692176
break;
21702177
case DELETE:

0 commit comments

Comments
 (0)