From 6656af2d31d77bd8486bfed026c8045bc42fff6b Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 18 Oct 2023 14:31:17 +0000 Subject: [PATCH 1/4] Add primary mode check before assserting on primary mode. Signed-off-by: Rishikesh1159 --- .../SegmentReplicationSourceHandler.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index e2c47b0fb3159..0bf1cf1457f19 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -146,11 +146,17 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene ); }; cancellableThreads.checkForCancel(); - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); - if (targetShardRouting == null) { - logger.debug("delaying replication of {} as it is not listed as assigned to target node {}", shard.shardId(), targetNode); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + if (shard.isPrimaryMode()) { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); + if (targetShardRouting == null) { + logger.debug( + "delaying replication of {} as it is not listed as assigned to target node {}", + shard.shardId(), + targetNode + ); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } } final StepListener sendFileStep = new StepListener<>(); From 1b5e7c589503a8802e67dfa48c0a818ae08b3633 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 19 Oct 2023 20:20:42 +0000 Subject: [PATCH 2/4] remove unnecessary shardRouting check. Signed-off-by: Rishikesh1159 --- .../SegmentReplicationSourceHandler.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 0bf1cf1457f19..674c09311c645 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -12,8 +12,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.StepListener; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.ListenableFuture; @@ -22,7 +20,6 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.recovery.DelayRecoveryException; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; import org.opensearch.indices.replication.common.CopyState; @@ -146,18 +143,6 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene ); }; cancellableThreads.checkForCancel(); - if (shard.isPrimaryMode()) { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying replication of {} as it is not listed as assigned to target node {}", - shard.shardId(), - targetNode - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - } final StepListener sendFileStep = new StepListener<>(); Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); From 1192ee3e70b01ab460d07364dfce82cb2558894d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 19 Oct 2023 20:33:35 +0000 Subject: [PATCH 3/4] Add test logging. Signed-off-by: Rishikesh1159 --- .../indices/replication/SegmentReplicationRelocationIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index dd832a63d1e66..d4b19199aba36 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -26,6 +26,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; @@ -55,6 +56,7 @@ private void createIndex(int replicaCount) { * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before * relocation and after relocation documents are indexed and documents are verified */ + @TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE") public void testPrimaryRelocation() throws Exception { final String oldPrimary = internalCluster().startNode(); createIndex(1); From e07dc80a1ab6b7654ea3a7a6d2e55057def2a42e Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 20 Oct 2023 13:02:19 +0000 Subject: [PATCH 4/4] Addressing comments on PR. Signed-off-by: Rishikesh1159 --- .../indices/replication/SegmentReplicationRelocationIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index d4b19199aba36..dbe0b43441f54 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -56,7 +56,7 @@ private void createIndex(int replicaCount) { * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before * relocation and after relocation documents are indexed and documents are verified */ - @TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE") + @TestLogging(reason = "Getting trace logs from replication,shard and allocation package", value = "org.opensearch.indices.replication:TRACE, org.opensearch.index.shard:TRACE, org.opensearch.cluster.routing.allocation:TRACE") public void testPrimaryRelocation() throws Exception { final String oldPrimary = internalCluster().startNode(); createIndex(1);