Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137694.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137694
summary: Iterate directly over contents of `RoutingNode`
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -213,18 +214,16 @@ void remove(ShardRouting shard) {
assert invariant();
}

private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0];

public ShardRouting[] initializing() {
return initializingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
public Iterable<ShardRouting> initializing() {
return Iterables.assertReadOnly(initializingShards);
}

public ShardRouting[] relocating() {
return relocatingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
public Iterable<ShardRouting> relocating() {
return Iterables.assertReadOnly(relocatingShards);
}

public ShardRouting[] started() {
return startedShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
public Iterable<ShardRouting> started() {
return Iterables.assertReadOnly(startedShards);
}

/**
Expand Down Expand Up @@ -313,6 +312,8 @@ public String toString() {
return sb.toString();
}

private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0];

public ShardRouting[] copyShards() {
return shards.values().toArray(EMPTY_SHARD_ROUTING_ARRAY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.ShardAllocationExplainer;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -38,6 +39,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -294,12 +296,13 @@ public DesiredBalance compute(
final var rerouteExplanation = command.execute(routingAllocation, false);
assert rerouteExplanation.decisions().type() != Decision.Type.NO : "should have thrown for NO decision";
if (rerouteExplanation.decisions().type() != Decision.Type.NO) {
final ShardRouting[] initializingShards = routingNodes.node(
final Iterator<ShardRouting> initializingShardsIterator = routingNodes.node(
routingAllocation.nodes().resolveNode(command.toNode()).getId()
).initializing();
assert initializingShards.length == 1
: "expect exactly one relocating shard, but got: " + List.of(initializingShards);
final var initializingShard = initializingShards[0];
).initializing().iterator();
assert initializingShardsIterator.hasNext();
final var initializingShard = initializingShardsIterator.next();
assert initializingShardsIterator.hasNext() == false
: "expect exactly one relocating shard, but got: " + Iterators.toList(initializingShardsIterator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
: "expect exactly one relocating shard, but got: " + Iterators.toList(initializingShardsIterator);
: "expect exactly one relocating shard, but got: [" + initializingShard + "] and " + Iterators.toList(initializingShardsIterator);

assert routingAllocation.nodes()
.resolveNode(command.fromNode())
.getId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,16 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

// Count the primaries currently doing recovery on the node, to ensure the primariesInitialRecoveries setting is obeyed.
int primariesInRecovery = 0;
final var returnUnexplainedDecision = allocation.debugDecision() == false;
for (ShardRouting shard : node.initializing()) {
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
// we only count initial recoveries here, so we need to make sure that relocating node is null
if (shard.primary() && shard.relocatingNodeId() == null) {
primariesInRecovery++;
if (returnUnexplainedDecision && primariesInRecovery >= primariesInitialRecoveries) {
// bail out early if we don't need the final total
return THROTTLE;
}
}
}
if (primariesInRecovery >= primariesInitialRecoveries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.common.collect;

import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;

import java.util.ArrayList;
Expand Down Expand Up @@ -507,6 +508,42 @@ public T next() {
}
}

/**
* Adds a wrapper around {@code iterator} which asserts that {@link Iterator#remove()} is not called.
*/
public static <T> Iterator<T> assertReadOnly(final Iterator<T> iterator) {
return Assertions.ENABLED ? new AssertReadOnlyIterator<>(Objects.requireNonNull(iterator)) : iterator;
}

private static class AssertReadOnlyIterator<T> implements Iterator<T> {

private final Iterator<T> delegate;

AssertReadOnlyIterator(Iterator<T> delegate) {
this.delegate = delegate;
}

@Override
public boolean hasNext() {
return delegate.hasNext();
}

@Override
public T next() {
return delegate.next();
}

@Override
public void forEachRemaining(Consumer<? super T> action) {
delegate.forEachRemaining(action);
}

@Override
public void remove() {
throw new AssertionError();
}
}

public static <T> boolean equals(Iterator<? extends T> iterator1, Iterator<? extends T> iterator2, BiPredicate<T, T> itemComparer) {
if (iterator1 == null) {
return iterator2 == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

package org.elasticsearch.common.util.iterable;

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.core.Assertions;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -81,4 +84,15 @@ public static <T> int indexOf(Iterable<T> iterable, Predicate<T> predicate) {
public static long size(Iterable<?> iterable) {
return StreamSupport.stream(iterable.spliterator(), false).count();
}

/**
* Adds a wrapper around {@code iterable} which asserts that {@link Iterator#remove()} is not called on the iterator it returns.
*/
public static <T> Iterable<T> assertReadOnly(Iterable<T> iterable) {
if (Assertions.ENABLED) {
return () -> Iterators.assertReadOnly(iterable.iterator());
} else {
return iterable;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import org.elasticsearch.test.ESTestCase;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -161,7 +160,9 @@ public void testReturnStartedShards() {
}

private static Set<ShardId> startedShardsSet(RoutingNode routingNode) {
return Arrays.stream(routingNode.started()).map(ShardRouting::shardId).collect(Collectors.toSet());
final var result = new HashSet<ShardId>();
routingNode.started().forEach(shardRouting -> result.add(shardRouting.shardId()));
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.TimeProvider;
Expand Down Expand Up @@ -103,10 +104,10 @@
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -581,7 +582,7 @@ public void allocate(RoutingAllocation allocation) {
allocation.routingNodes().getRelocatingShardCount(),
equalTo(0)
);
assertThat(allocation.routingNodes().node("node-2").started(), arrayWithSize(2));
assertThat(Iterators.toList(allocation.routingNodes().node("node-2").started().iterator()), hasSize(2));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,19 @@ public void testCycle() {
}
}

public void testAssertReadOnly() {
assumeTrue("assertions enabled", Assertions.ENABLED);
final List<Integer> innerList = new ArrayList<>(List.of(1, 2, 3, 4));
assertTrue(Iterators.equals(innerList.iterator(), Iterators.assertReadOnly(innerList.iterator()), Objects::equals));

final var readonly = Iterators.assertReadOnly(innerList.iterator());
assertTrue(readonly.hasNext());
assertEquals(Integer.valueOf(1), readonly.next());
expectThrows(AssertionError.class, readonly::remove);

assertEquals(List.of(1, 2, 3, 4), innerList);
}

public void testEquals() {
final BiPredicate<Object, Object> notCalled = (a, b) -> { throw new AssertionError("not called"); };

Expand Down