|
28 | 28 | import java.util.Arrays;
|
29 | 29 | import java.util.Collections;
|
30 | 30 | import java.util.Comparator;
|
| 31 | +import java.util.HashMap; |
31 | 32 | import java.util.HashSet;
|
32 | 33 | import java.util.List;
|
33 | 34 | import java.util.Locale;
|
@@ -258,22 +259,14 @@ public ShardIterator activeInitializingShardsRankedIt(
|
258 | 259 | return new ShardIterator(shardId, ordered);
|
259 | 260 | }
|
260 | 261 |
|
261 |
| - private static Set<String> getAllNodeIds(final List<ShardRouting> shards) { |
262 |
| - final Set<String> nodeIds = new HashSet<>(); |
263 |
| - for (ShardRouting shard : shards) { |
264 |
| - nodeIds.add(shard.currentNodeId()); |
265 |
| - } |
266 |
| - return nodeIds; |
267 |
| - } |
268 |
| - |
269 | 262 | private static Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> getNodeStats(
|
270 |
| - final Set<String> nodeIds, |
| 263 | + List<ShardRouting> shardRoutings, |
271 | 264 | final ResponseCollectorService collector
|
272 | 265 | ) {
|
273 | 266 |
|
274 |
| - final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = Maps.newMapWithExpectedSize(nodeIds.size()); |
275 |
| - for (String nodeId : nodeIds) { |
276 |
| - nodeStats.put(nodeId, collector.getNodeStatistics(nodeId)); |
| 267 | + final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = new HashMap<>(); |
| 268 | + for (ShardRouting shardRouting : shardRoutings) { |
| 269 | + nodeStats.computeIfAbsent(shardRouting.currentNodeId(), collector::getNodeStatistics); |
277 | 270 | }
|
278 | 271 | return nodeStats;
|
279 | 272 | }
|
@@ -342,32 +335,28 @@ private static List<ShardRouting> rankShardsAndUpdateStats(
|
342 | 335 | }
|
343 | 336 |
|
344 | 337 | // Retrieve which nodes we can potentially send the query to
|
345 |
| - final Set<String> nodeIds = getAllNodeIds(shards); |
346 |
| - final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector); |
| 338 | + final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(shards, collector); |
347 | 339 |
|
348 | 340 | // Retrieve all the nodes the shards exist on
|
349 |
| - final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts); |
350 | 341 |
|
351 | 342 | // sort all shards based on the shard rank
|
352 | 343 | ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);
|
353 |
| - Collections.sort(sortedShards, new NodeRankComparator(nodeRanks)); |
| 344 | + sortedShards.sort(new NodeRankComparator(rankNodes(nodeStats, nodeSearchCounts))); |
354 | 345 |
|
355 | 346 | // adjust the non-winner nodes' stats so they will get a chance to receive queries
|
356 |
| - if (sortedShards.size() > 1) { |
357 |
| - ShardRouting minShard = sortedShards.get(0); |
358 |
| - // If the winning shard is not started we are ranking initializing |
359 |
| - // shards, don't bother to do adjustments |
360 |
| - if (minShard.started()) { |
361 |
| - String minNodeId = minShard.currentNodeId(); |
362 |
| - Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId); |
363 |
| - if (maybeMinStats.isPresent()) { |
364 |
| - adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get()); |
365 |
| - // Increase the number of searches for the "winning" node by one. |
366 |
| - // Note that this doesn't actually affect the "real" counts, instead |
367 |
| - // it only affects the captured node search counts, which is |
368 |
| - // captured once for each query in TransportSearchAction |
369 |
| - nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1); |
370 |
| - } |
| 347 | + ShardRouting minShard = sortedShards.get(0); |
| 348 | + // If the winning shard is not started we are ranking initializing |
| 349 | + // shards, don't bother to do adjustments |
| 350 | + if (minShard.started()) { |
| 351 | + String minNodeId = minShard.currentNodeId(); |
| 352 | + Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId); |
| 353 | + if (maybeMinStats.isPresent()) { |
| 354 | + adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get()); |
| 355 | + // Increase the number of searches for the "winning" node by one. |
| 356 | + // Note that this doesn't actually affect the "real" counts, instead |
| 357 | + // it only affects the captured node search counts, which is |
| 358 | + // captured once for each query in TransportSearchAction |
| 359 | + nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1); |
371 | 360 | }
|
372 | 361 | }
|
373 | 362 |
|
|
0 commit comments