Skip to content

Commit e4eb9f0

Browse files
committed
PrimaryShardAllocator refactor to abstract out shard state and method calls
Signed-off-by: Shivansh Arora <shivansh.arora@protonmail.com>
1 parent 0c839c3 commit e4eb9f0

File tree

3 files changed

+178
-67
lines changed

3 files changed

+178
-67
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
152152
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))
153153
- Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177))
154154
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412))
155+
- PrimaryShardAllocator refactor to abstract out shard state and method calls ([#9760](https://github.com/opensearch-project/OpenSearch/pull/9760)))
155156

156157
### Deprecated
157158

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,33 @@
3434

3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
37+
import org.opensearch.cluster.node.DiscoveryNode;
3738
import org.opensearch.cluster.routing.RecoverySource;
3839
import org.opensearch.cluster.routing.RoutingNode;
40+
import org.opensearch.cluster.routing.RoutingNodes;
3941
import org.opensearch.cluster.routing.ShardRouting;
4042
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
4143
import org.opensearch.cluster.routing.allocation.AllocationDecision;
4244
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
4345
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
4446
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
4547
import org.opensearch.cluster.routing.allocation.decider.Decision;
48+
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
4649

4750
import java.util.ArrayList;
51+
import java.util.Comparator;
52+
import java.util.HashMap;
53+
import java.util.Iterator;
4854
import java.util.List;
55+
import java.util.Set;
56+
import java.util.TreeMap;
57+
import java.util.concurrent.ConcurrentMap;
58+
import java.util.stream.Collectors;
4959

5060
/**
5161
* An abstract class that implements basic functionality for allocating
5262
* shards to nodes based on shard copies that already exist in the cluster.
53-
*
63+
* <p>
5464
* Individual implementations of this class are responsible for providing
5565
* the logic to determine to which nodes (if any) those shards are allocated.
5666
*
@@ -64,8 +74,9 @@ public abstract class BaseGatewayShardAllocator {
6474
* Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist.
6575
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
6676
* to make decisions on assigning shards to nodes.
67-
* @param shardRouting the shard to allocate
68-
* @param allocation the allocation state container object
77+
*
78+
* @param shardRouting the shard to allocate
79+
* @param allocation the allocation state container object
6980
* @param unassignedAllocationHandler handles the allocation of the current shard
7081
*/
7182
public void allocateUnassigned(
@@ -109,9 +120,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation
109120
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions
110121
* about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated.
111122
*
112-
* @param unassignedShard the unassigned shard to allocate
113-
* @param allocation the current routing state
114-
* @param logger the logger
123+
* @param unassignedShard the unassigned shard to allocate
124+
* @param allocation the current routing state
125+
* @param logger the logger
115126
* @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision
116127
*/
117128
public abstract AllocateUnassignedDecision makeAllocationDecision(
@@ -132,4 +143,68 @@ protected static List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouti
132143
}
133144
return results;
134145
}
146+
147+
protected static class NodeShardState {
148+
private final String allocationId;
149+
private final boolean primary;
150+
private final Exception storeException;
151+
private final ReplicationCheckpoint replicationCheckpoint;
152+
private final DiscoveryNode node;
153+
154+
public NodeShardState(DiscoveryNode node,
155+
String allocationId,
156+
boolean primary,
157+
ReplicationCheckpoint replicationCheckpoint,
158+
Exception storeException) {
159+
this.node = node;
160+
this.allocationId = allocationId;
161+
this.primary = primary;
162+
this.replicationCheckpoint = replicationCheckpoint;
163+
this.storeException = storeException;
164+
}
165+
166+
public String allocationId() {
167+
return this.allocationId;
168+
}
169+
170+
public boolean primary() {
171+
return this.primary;
172+
}
173+
174+
public ReplicationCheckpoint replicationCheckpoint() {
175+
return this.replicationCheckpoint;
176+
}
177+
178+
public Exception storeException() {
179+
return this.storeException;
180+
}
181+
182+
public DiscoveryNode getNode() {
183+
return this.node;
184+
}
185+
}
186+
187+
protected static class NodeShardStates {
188+
TreeMap<NodeShardState, DiscoveryNode> nodeShardStates;
189+
190+
public NodeShardStates(Comparator<NodeShardState> comparator) {
191+
this.nodeShardStates = new TreeMap<>(comparator);
192+
}
193+
194+
public void add(NodeShardState key, DiscoveryNode value) {
195+
this.nodeShardStates.put(key, value);
196+
}
197+
198+
public DiscoveryNode get(NodeShardState key) {
199+
return this.nodeShardStates.get(key);
200+
}
201+
202+
public int size() {
203+
return this.nodeShardStates.size();
204+
}
205+
206+
public Iterator<NodeShardState> iterator() {
207+
return this.nodeShardStates.keySet().iterator();
208+
}
209+
}
135210
}

0 commit comments

Comments
 (0)