Skip to content

Commit feb6f19

Browse files
[Segment Replication] Remove Doc Parsing for segment replication enabled replica shard during translog replay from recovery (#9002)
* Remove Doc Parsing for segment replication enabled replica shard during translog replay from recovery. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Adding unit test to verify document is not parsed on an segment replication enabled replica shard. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * remove unnecessary unit tests and address comments. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * address comments on PR. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> --------- Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> (cherry picked from commit 435408b) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent d090a7f commit feb6f19

File tree

2 files changed

+104
-21
lines changed

2 files changed

+104
-21
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,14 @@
9191
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
9292
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
9393
import static org.opensearch.index.query.QueryBuilders.matchQuery;
94+
import static org.opensearch.index.query.QueryBuilders.termQuery;
95+
import static org.opensearch.index.query.QueryBuilders.boolQuery;
96+
import static org.opensearch.index.query.QueryBuilders.rangeQuery;
9497
import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX;
9598
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
9699
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
97100
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
101+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
98102
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;
99103

100104
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@@ -1350,4 +1354,76 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
13501354
ensureGreen(INDEX_NAME);
13511355
waitForSearchableDocs(2, nodes);
13521356
}
1357+
1358+
public void testIndexWhileRecoveringReplica() throws Exception {
1359+
final String primaryNode = internalCluster().startDataOnlyNode();
1360+
assertAcked(
1361+
prepareCreate(INDEX_NAME).setMapping(
1362+
jsonBuilder().startObject()
1363+
.startObject("_routing")
1364+
.field("required", true)
1365+
.endObject()
1366+
.startObject("properties")
1367+
.startObject("online")
1368+
.field("type", "boolean")
1369+
.endObject()
1370+
.startObject("ts")
1371+
.field("type", "date")
1372+
.field("ignore_malformed", false)
1373+
.field("format", "epoch_millis")
1374+
.endObject()
1375+
.startObject("bs")
1376+
.field("type", "keyword")
1377+
.endObject()
1378+
.endObject()
1379+
.endObject()
1380+
)
1381+
);
1382+
ensureYellow(INDEX_NAME);
1383+
final String replicaNode = internalCluster().startDataOnlyNode();
1384+
1385+
client().prepareIndex(INDEX_NAME)
1386+
.setId("1")
1387+
.setRouting("Y")
1388+
.setSource("online", false, "bs", "Y", "ts", System.currentTimeMillis() - 100, "type", "s")
1389+
.get();
1390+
client().prepareIndex(INDEX_NAME)
1391+
.setId("2")
1392+
.setRouting("X")
1393+
.setSource("online", true, "bs", "X", "ts", System.currentTimeMillis() - 10000000, "type", "s")
1394+
.get();
1395+
client().prepareIndex(INDEX_NAME)
1396+
.setId("3")
1397+
.setRouting(randomAlphaOfLength(2))
1398+
.setSource("online", false, "ts", System.currentTimeMillis() - 100, "type", "bs")
1399+
.get();
1400+
client().prepareIndex(INDEX_NAME)
1401+
.setId("4")
1402+
.setRouting(randomAlphaOfLength(2))
1403+
.setSource("online", true, "ts", System.currentTimeMillis() - 123123, "type", "bs")
1404+
.get();
1405+
refresh();
1406+
ensureGreen(INDEX_NAME);
1407+
waitForSearchableDocs(4, primaryNode, replicaNode);
1408+
1409+
SearchResponse response = client().prepareSearch(INDEX_NAME)
1410+
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
1411+
.setQuery(
1412+
boolQuery().must(termQuery("online", true))
1413+
.must(
1414+
boolQuery().should(
1415+
boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "bs"))
1416+
)
1417+
.should(
1418+
boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "s"))
1419+
)
1420+
)
1421+
)
1422+
.setVersion(true)
1423+
.setFrom(0)
1424+
.setSize(100)
1425+
.setExplain(true)
1426+
.get();
1427+
assertNoFailures(response);
1428+
}
13531429
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,8 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
945945
autoGeneratedTimestamp,
946946
isRetry,
947947
Engine.Operation.Origin.PRIMARY,
948-
sourceToParse
948+
sourceToParse,
949+
null
949950
);
950951
}
951952

@@ -958,23 +959,6 @@ public Engine.IndexResult applyIndexOperationOnReplica(
958959
boolean isRetry,
959960
SourceToParse sourceToParse
960961
) throws IOException {
961-
if (indexSettings.isSegRepEnabled()) {
962-
Engine.Index index = new Engine.Index(
963-
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
964-
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
965-
seqNo,
966-
opPrimaryTerm,
967-
version,
968-
null,
969-
Engine.Operation.Origin.REPLICA,
970-
System.nanoTime(),
971-
autoGeneratedTimeStamp,
972-
isRetry,
973-
UNASSIGNED_SEQ_NO,
974-
0
975-
);
976-
return getEngine().index(index);
977-
}
978962
return applyIndexOperation(
979963
getEngine(),
980964
seqNo,
@@ -986,7 +970,8 @@ public Engine.IndexResult applyIndexOperationOnReplica(
986970
autoGeneratedTimeStamp,
987971
isRetry,
988972
Engine.Operation.Origin.REPLICA,
989-
sourceToParse
973+
sourceToParse,
974+
id
990975
);
991976
}
992977

@@ -1001,8 +986,29 @@ private Engine.IndexResult applyIndexOperation(
1001986
long autoGeneratedTimeStamp,
1002987
boolean isRetry,
1003988
Engine.Operation.Origin origin,
1004-
SourceToParse sourceToParse
989+
SourceToParse sourceToParse,
990+
String id
1005991
) throws IOException {
992+
993+
// For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary
994+
// shard.
995+
if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) {
996+
Engine.Index index = new Engine.Index(
997+
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
998+
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
999+
seqNo,
1000+
opPrimaryTerm,
1001+
version,
1002+
null,
1003+
Engine.Operation.Origin.REPLICA,
1004+
System.nanoTime(),
1005+
autoGeneratedTimeStamp,
1006+
isRetry,
1007+
UNASSIGNED_SEQ_NO,
1008+
0
1009+
);
1010+
return getEngine().index(index);
1011+
}
10061012
assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ "
10071013
+ opPrimaryTerm
10081014
+ " ] > shard term ["
@@ -2170,7 +2176,8 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
21702176
index.source(),
21712177
MediaTypeRegistry.xContentType(index.source()),
21722178
index.routing()
2173-
)
2179+
),
2180+
index.id()
21742181
);
21752182
break;
21762183
case DELETE:

0 commit comments

Comments
 (0)