Skip to content

[Segment Replication] Remove Doc Parsing for segment replication enabled replica shard during translog replay from recovery #9002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,14 @@
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.index.query.QueryBuilders.boolQuery;
import static org.opensearch.index.query.QueryBuilders.rangeQuery;
import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -1343,4 +1347,76 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
ensureGreen(INDEX_NAME);
waitForSearchableDocs(2, nodes);
}

public void testIndexWhileRecoveringReplica() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate(INDEX_NAME).setMapping(
jsonBuilder().startObject()
.startObject("_routing")
.field("required", true)
.endObject()
.startObject("properties")
.startObject("online")
.field("type", "boolean")
.endObject()
.startObject("ts")
.field("type", "date")
.field("ignore_malformed", false)
.field("format", "epoch_millis")
.endObject()
.startObject("bs")
.field("type", "keyword")
.endObject()
.endObject()
.endObject()
)
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();

client().prepareIndex(INDEX_NAME)
.setId("1")
.setRouting("Y")
.setSource("online", false, "bs", "Y", "ts", System.currentTimeMillis() - 100, "type", "s")
.get();
client().prepareIndex(INDEX_NAME)
.setId("2")
.setRouting("X")
.setSource("online", true, "bs", "X", "ts", System.currentTimeMillis() - 10000000, "type", "s")
.get();
client().prepareIndex(INDEX_NAME)
.setId("3")
.setRouting(randomAlphaOfLength(2))
.setSource("online", false, "ts", System.currentTimeMillis() - 100, "type", "bs")
.get();
client().prepareIndex(INDEX_NAME)
.setId("4")
.setRouting(randomAlphaOfLength(2))
.setSource("online", true, "ts", System.currentTimeMillis() - 123123, "type", "bs")
.get();
refresh();
ensureGreen(INDEX_NAME);
waitForSearchableDocs(4, primaryNode, replicaNode);

SearchResponse response = client().prepareSearch(INDEX_NAME)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(
boolQuery().must(termQuery("online", true))
.must(
boolQuery().should(
boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "bs"))
)
.should(
boolQuery().must(rangeQuery("ts").lt(System.currentTimeMillis() - (15 * 1000))).must(termQuery("type", "s"))
)
)
)
.setVersion(true)
.setFrom(0)
.setSize(100)
.setExplain(true)
.get();
assertNoFailures(response);
}
}
49 changes: 28 additions & 21 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,8 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
autoGeneratedTimestamp,
isRetry,
Engine.Operation.Origin.PRIMARY,
sourceToParse
sourceToParse,
null
);
}

Expand All @@ -942,23 +943,6 @@ public Engine.IndexResult applyIndexOperationOnReplica(
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
if (indexSettings.isSegRepEnabled()) {
Engine.Index index = new Engine.Index(
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
seqNo,
opPrimaryTerm,
version,
null,
Engine.Operation.Origin.REPLICA,
System.nanoTime(),
autoGeneratedTimeStamp,
isRetry,
UNASSIGNED_SEQ_NO,
0
);
return getEngine().index(index);
}
return applyIndexOperation(
getEngine(),
seqNo,
Expand All @@ -970,7 +954,8 @@ public Engine.IndexResult applyIndexOperationOnReplica(
autoGeneratedTimeStamp,
isRetry,
Engine.Operation.Origin.REPLICA,
sourceToParse
sourceToParse,
id
);
}

Expand All @@ -985,8 +970,29 @@ private Engine.IndexResult applyIndexOperation(
long autoGeneratedTimeStamp,
boolean isRetry,
Engine.Operation.Origin origin,
SourceToParse sourceToParse
SourceToParse sourceToParse,
String id
) throws IOException {

// For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary
// shard.
if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) {
Engine.Index index = new Engine.Index(
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null),
seqNo,
opPrimaryTerm,
version,
null,
Engine.Operation.Origin.REPLICA,
System.nanoTime(),
autoGeneratedTimeStamp,
isRetry,
UNASSIGNED_SEQ_NO,
0
);
return getEngine().index(index);
}
assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ "
+ opPrimaryTerm
+ " ] > shard term ["
Expand Down Expand Up @@ -2171,7 +2177,8 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
index.source(),
MediaTypeRegistry.xContentType(index.source()),
index.routing()
)
),
index.id()
);
break;
case DELETE:
Expand Down