35
35
import org .opensearch .cluster .node .DiscoveryNode ;
36
36
import org .opensearch .cluster .node .DiscoveryNodes ;
37
37
import org .opensearch .cluster .routing .ShardRouting ;
38
- import org .opensearch .cluster .routing .allocation .AllocateUnassignedDecision ;
39
38
import org .opensearch .cluster .routing .allocation .FailedShard ;
40
39
import org .opensearch .cluster .routing .allocation .RoutingAllocation ;
41
40
import org .opensearch .core .index .shard .ShardId ;
42
41
import org .opensearch .gateway .AsyncShardFetch ;
43
42
import org .opensearch .gateway .GatewayAllocator ;
44
43
import org .opensearch .gateway .PrimaryShardAllocator ;
45
- import org .opensearch .gateway .PrimaryShardBatchAllocator ;
46
44
import org .opensearch .gateway .ReplicaShardAllocator ;
47
- import org .opensearch .gateway .ReplicaShardBatchAllocator ;
48
- import org .opensearch .gateway .TransportNodesListGatewayStartedBatchShards ;
49
45
import org .opensearch .gateway .TransportNodesListGatewayStartedShards .NodeGatewayStartedShards ;
50
46
import org .opensearch .indices .replication .checkpoint .ReplicationCheckpoint ;
51
47
import org .opensearch .indices .store .TransportNodesListShardStoreMetadata .NodeStoreFilesMetadata ;
52
- import org .opensearch .indices .store .TransportNodesListShardStoreMetadataBatch ;
53
48
54
49
import java .util .Collections ;
55
50
import java .util .HashMap ;
62
57
* A gateway allocator implementation that keeps an in memory list of started shard allocation
63
58
* that are used as replies to the, normally async, fetch data requests. The in memory list
64
59
* is adapted when shards are started and failed.
65
- *
60
+ * <p>
66
61
* Nodes leaving and joining the cluster do not change the list of shards the class tracks but
67
62
* rather serves as a filter to what is returned by fetch data. Concretely - fetch data will
68
63
* only return shards that were started on nodes that are currently part of the cluster.
69
- *
64
+ * <p>
70
65
* For now only primary shard related data is fetched. Replica request always get an empty response.
71
- *
66
+ * <p>
72
67
*
73
68
* This class is useful to use in unit tests that require the functionality of {@link GatewayAllocator} but do
74
69
* not have all the infrastructure required to use it.
@@ -103,52 +98,7 @@ protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardR
103
98
)
104
99
);
105
100
106
- return new AsyncShardFetch .FetchResult <>(foundShards , new HashMap <>() {
107
- {
108
- put (shardId , ignoreNodes );
109
- }
110
- });
111
- }
112
- };
113
-
114
- PrimaryShardBatchAllocator primaryBatchShardAllocator = new PrimaryShardBatchAllocator () {
115
- @ Override
116
- protected AsyncShardFetch .FetchResult <TransportNodesListGatewayStartedBatchShards .NodeGatewayStartedShardsBatch > fetchData (
117
- Set <ShardRouting > shardsEligibleForFetch ,
118
- Set <ShardRouting > inEligibleShards ,
119
- RoutingAllocation allocation
120
- ) {
121
- Map <DiscoveryNode , TransportNodesListGatewayStartedBatchShards .NodeGatewayStartedShardsBatch > foundShards = new HashMap <>();
122
- HashMap <ShardId , Set <String >> shardsToIgnoreNodes = new HashMap <>();
123
- for (Map .Entry <String , Map <ShardId , ShardRouting >> entry : knownAllocations .entrySet ()) {
124
- String nodeId = entry .getKey ();
125
- Map <ShardId , ShardRouting > shardsOnNode = entry .getValue ();
126
- HashMap <ShardId , TransportNodesListGatewayStartedBatchShards .NodeGatewayStartedShard > adaptedResponse = new HashMap <>();
127
-
128
- for (ShardRouting shardRouting : shardsEligibleForFetch ) {
129
- ShardId shardId = shardRouting .shardId ();
130
- Set <String > ignoreNodes = allocation .getIgnoreNodes (shardId );
131
-
132
- if (shardsOnNode .containsKey (shardId ) && ignoreNodes .contains (nodeId ) == false && currentNodes .nodeExists (nodeId )) {
133
- TransportNodesListGatewayStartedBatchShards .NodeGatewayStartedShard nodeShard =
134
- new TransportNodesListGatewayStartedBatchShards .NodeGatewayStartedShard (
135
- shardRouting .allocationId ().getId (),
136
- shardRouting .primary (),
137
- getReplicationCheckpoint (shardId , nodeId )
138
- );
139
- adaptedResponse .put (shardId , nodeShard );
140
- shardsToIgnoreNodes .put (shardId , ignoreNodes );
141
- }
142
- foundShards .put (
143
- currentNodes .get (nodeId ),
144
- new TransportNodesListGatewayStartedBatchShards .NodeGatewayStartedShardsBatch (
145
- currentNodes .get (nodeId ),
146
- adaptedResponse
147
- )
148
- );
149
- }
150
- }
151
- return new AsyncShardFetch .FetchResult <>(foundShards , shardsToIgnoreNodes );
101
+ return new AsyncShardFetch .FetchResult <>(shardId , foundShards , ignoreNodes );
152
102
}
153
103
};
154
104
@@ -161,28 +111,7 @@ private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String n
161
111
protected AsyncShardFetch .FetchResult <NodeStoreFilesMetadata > fetchData (ShardRouting shard , RoutingAllocation allocation ) {
162
112
// for now, just pretend no node has data
163
113
final ShardId shardId = shard .shardId ();
164
- return new AsyncShardFetch .FetchResult <>(Collections .emptyMap (), new HashMap <>() {
165
- {
166
- put (shardId , allocation .getIgnoreNodes (shardId ));
167
- }
168
- });
169
- }
170
-
171
- @ Override
172
- protected boolean hasInitiatedFetching (ShardRouting shard ) {
173
- return true ;
174
- }
175
- };
176
-
177
- ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator () {
178
-
179
- @ Override
180
- protected AsyncShardFetch .FetchResult <TransportNodesListShardStoreMetadataBatch .NodeStoreFilesMetadataBatch > fetchData (
181
- Set <ShardRouting > shardsEligibleForFetch ,
182
- Set <ShardRouting > inEligibleShards ,
183
- RoutingAllocation allocation
184
- ) {
185
- return new AsyncShardFetch .FetchResult <>(Collections .emptyMap (), Collections .emptyMap ());
114
+ return new AsyncShardFetch .FetchResult <>(shardId , Collections .emptyMap (), allocation .getIgnoreNodes (shardId ));
186
115
}
187
116
188
117
@ Override
@@ -228,12 +157,6 @@ public void allocateUnassigned(
228
157
innerAllocatedUnassigned (allocation , primaryShardAllocator , replicaShardAllocator , shardRouting , unassignedAllocationHandler );
229
158
}
230
159
231
- @ Override
232
- public void allocateUnassignedBatch (RoutingAllocation allocation , boolean primary ) {
233
- currentNodes = allocation .nodes ();
234
- innerAllocateUnassignedBatch (allocation , primaryBatchShardAllocator , replicaBatchShardAllocator , primary );
235
- }
236
-
237
160
/**
238
161
* manually add a specific shard to the allocations the gateway keeps track of
239
162
*/
@@ -248,34 +171,4 @@ public String getReplicationCheckPointKey(ShardId shardId, String nodeName) {
248
171
public void addReplicationCheckpoint (ShardId shardId , String nodeName , ReplicationCheckpoint replicationCheckpoint ) {
249
172
shardIdNodeToReplicationCheckPointMap .putIfAbsent (getReplicationCheckPointKey (shardId , nodeName ), replicationCheckpoint );
250
173
}
251
-
252
- public Set <String > createAndUpdateBatches (RoutingAllocation allocation , boolean primary ) {
253
- return super .createAndUpdateBatches (allocation , primary );
254
- }
255
-
256
- public void safelyRemoveShardFromBatch (ShardRouting shard ) {
257
- super .safelyRemoveShardFromBatch (shard );
258
- }
259
-
260
- public void safelyRemoveShardFromBothBatch (ShardRouting shardRouting ) {
261
- super .safelyRemoveShardFromBothBatch (shardRouting );
262
- }
263
-
264
- public String getBatchId (ShardRouting shard , boolean primary ) {
265
- return super .getBatchId (shard , primary );
266
- }
267
-
268
- public Map <String , GatewayAllocator .ShardsBatch > getBatchIdToStartedShardBatch () {
269
- return batchIdToStartedShardBatch ;
270
- }
271
-
272
- public Map <String , GatewayAllocator .ShardsBatch > getBatchIdToStoreShardBatch () {
273
- return batchIdToStoreShardBatch ;
274
- }
275
-
276
- @ Override
277
- public AllocateUnassignedDecision explainUnassignedShardAllocation (ShardRouting unassignedShard , RoutingAllocation routingAllocation ) {
278
- setShardAllocators (primaryShardAllocator , replicaShardAllocator , primaryBatchShardAllocator , replicaBatchShardAllocator );
279
- return super .explainUnassignedShardAllocation (unassignedShard , routingAllocation );
280
- }
281
174
}
0 commit comments