Skip to content

Commit 7ce9886

Browse files
committed
fix disk usage exceed threshold cluster can not spin up issue
Signed-off-by: zane-neo <zaniu@amazon.com>
1 parent d9096b2 commit 7ce9886

File tree

5 files changed

+58
-56
lines changed

5 files changed

+58
-56
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4949
- Fix delete index template failed when the index template matches a data stream but is unused ([#15080](https://github.com/opensearch-project/OpenSearch/pull/15080))
5050
- Fix array_index_out_of_bounds_exception when indexing documents with field name containing only dot ([#15126](https://github.com/opensearch-project/OpenSearch/pull/15126))
5151
- Fixed array field name omission in flat_object function for nested JSON ([#13620](https://github.com/opensearch-project/OpenSearch/pull/13620))
52+
- Fix disk usage exceeds threshold cluster can't spin up issue ([#15258](https://github.com/opensearch-project/OpenSearch/pull/15258)))
5253

5354
### Security
5455

server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
6767

6868
private final Set<ClusterBlock> global;
6969

70-
private Map<String, Set<ClusterBlock>> indicesBlocks;
70+
private final Map<String, Set<ClusterBlock>> indicesBlocks;
7171

72-
private EnumMap<ClusterBlockLevel, ImmutableLevelHolder> levelHolders;
72+
private final EnumMap<ClusterBlockLevel, ImmutableLevelHolder> levelHolders;
7373

7474
ClusterBlocks(Set<ClusterBlock> global, final Map<String, Set<ClusterBlock>> indicesBlocks) {
7575
this.global = global;
@@ -161,24 +161,6 @@ public boolean hasIndexBlock(String index, ClusterBlock block) {
161161
return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
162162
}
163163

164-
public void removeIndexBlock(String index, ClusterBlock block) {
165-
Map<String, Set<ClusterBlock>> newIndicesBlocks = new HashMap<>(indicesBlocks); // copy to avoid UnsupportedOperationException>
166-
for (Map.Entry<String, Set<ClusterBlock>> entry : indicesBlocks.entrySet()) {
167-
String indexName = entry.getKey();
168-
Set<ClusterBlock> clusterBlockSet = new HashSet<>(entry.getValue());
169-
if (indexName.equals(index)) {
170-
clusterBlockSet.remove(block);
171-
if (clusterBlockSet.isEmpty()) {
172-
newIndicesBlocks.remove(indexName);
173-
} else {
174-
newIndicesBlocks.put(indexName, Collections.unmodifiableSet(clusterBlockSet));
175-
}
176-
}
177-
}
178-
this.indicesBlocks = Collections.unmodifiableMap(newIndicesBlocks);
179-
this.levelHolders = generateLevelHolders(global, indicesBlocks);
180-
}
181-
182164
public boolean hasIndexBlockWithId(String index, int blockId) {
183165
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
184166
if (clusterBlocks != null) {

server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
import org.opensearch.client.Client;
4040
import org.opensearch.cluster.ClusterInfo;
4141
import org.opensearch.cluster.ClusterState;
42+
import org.opensearch.cluster.ClusterStateUpdateTask;
4243
import org.opensearch.cluster.DiskUsage;
43-
import org.opensearch.cluster.block.ClusterBlock;
4444
import org.opensearch.cluster.block.ClusterBlockLevel;
4545
import org.opensearch.cluster.block.ClusterBlocks;
4646
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -50,8 +50,8 @@
5050
import org.opensearch.cluster.routing.RoutingNodes;
5151
import org.opensearch.cluster.routing.ShardRouting;
5252
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
53+
import org.opensearch.cluster.service.ClusterService;
5354
import org.opensearch.common.Priority;
54-
import org.opensearch.common.settings.ClusterSettings;
5555
import org.opensearch.common.settings.Settings;
5656
import org.opensearch.common.util.set.Sets;
5757
import org.opensearch.core.action.ActionListener;
@@ -82,6 +82,7 @@ public class DiskThresholdMonitor {
8282
private static final Logger logger = LogManager.getLogger(DiskThresholdMonitor.class);
8383
private final DiskThresholdSettings diskThresholdSettings;
8484
private final Client client;
85+
private final ClusterService clusterService;
8586
private final Supplier<ClusterState> clusterStateSupplier;
8687
private final LongSupplier currentTimeMillisSupplier;
8788
private final RerouteService rerouteService;
@@ -107,17 +108,16 @@ public class DiskThresholdMonitor {
107108
private final Set<String> nodesOverHighThresholdAndRelocating = Sets.newConcurrentHashSet();
108109

109110
public DiskThresholdMonitor(
110-
Settings settings,
111-
Supplier<ClusterState> clusterStateSupplier,
112-
ClusterSettings clusterSettings,
111+
ClusterService clusterService,
113112
Client client,
114113
LongSupplier currentTimeMillisSupplier,
115114
RerouteService rerouteService
116115
) {
117-
this.clusterStateSupplier = clusterStateSupplier;
116+
this.clusterService = clusterService;
117+
this.clusterStateSupplier = clusterService::state;
118118
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
119119
this.rerouteService = rerouteService;
120-
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
120+
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
121121
this.client = client;
122122
}
123123

@@ -158,6 +158,7 @@ public void onNewInfo(ClusterInfo info) {
158158
final Set<String> indicesToMarkReadOnly = new HashSet<>();
159159
RoutingNodes routingNodes = state.getRoutingNodes();
160160
Set<String> indicesNotToAutoRelease = new HashSet<>();
161+
final Set<String> indicesToRemoveReadOnly = new HashSet<>();
161162
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease);
162163

163164
final List<DiskUsage> usagesOverHighThreshold = new ArrayList<>();
@@ -255,8 +256,10 @@ public void onNewInfo(ClusterInfo info) {
255256
}
256257

257258
} else {
258-
for (Map.Entry<String, Set<ClusterBlock>> indexBlockEntry : state.blocks().indices().entrySet()) {
259-
state.blocks().removeIndexBlock(indexBlockEntry.getKey(), IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
259+
if (routingNode != null) {
260+
for (ShardRouting routing : routingNode) {
261+
indicesToRemoveReadOnly.add(routing.getIndexName());
262+
}
260263
}
261264

262265
nodesOverHighThresholdAndRelocating.remove(node);
@@ -290,6 +293,29 @@ public void onNewInfo(ClusterInfo info) {
290293
}
291294
}
292295

296+
// remove read-only blocks for indices.
297+
if (!indicesToRemoveReadOnly.isEmpty()) {
298+
clusterService.submitStateUpdateTask("disk-usage-recovered", new ClusterStateUpdateTask() {
299+
@Override
300+
public ClusterState execute(ClusterState currentState) {
301+
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(state.blocks());
302+
for (String index : indicesToRemoveReadOnly) {
303+
clusterBlocksBuilder.removeIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
304+
}
305+
final Metadata metadata = Metadata.builder()
306+
.clusterUUID(state.metadata().clusterUUID())
307+
.coordinationMetadata(state.metadata().coordinationMetadata())
308+
.build();
309+
return ClusterState.builder(state).metadata(metadata).blocks(clusterBlocksBuilder.build()).build();
310+
}
311+
312+
@Override
313+
public void onFailure(String source, Exception e) {
314+
logger.error("failed to update cluster state for disk usage recovery task", e);
315+
}
316+
});
317+
}
318+
293319
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 4);
294320

295321
if (reroute) {

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,9 +1211,7 @@ protected Node(
12111211
);
12121212

12131213
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
1214-
settings,
1215-
clusterService::state,
1216-
clusterService.getClusterSettings(),
1214+
clusterService,
12171215
client,
12181216
threadPool::relativeTimeInMillis,
12191217
rerouteService

server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
import org.opensearch.cluster.routing.RoutingNode;
4848
import org.opensearch.cluster.routing.RoutingTable;
4949
import org.opensearch.cluster.routing.ShardRoutingState;
50+
import org.opensearch.cluster.service.ClusterApplierService;
51+
import org.opensearch.cluster.service.ClusterManagerService;
52+
import org.opensearch.cluster.service.ClusterService;
5053
import org.opensearch.common.Priority;
5154
import org.opensearch.common.settings.ClusterSettings;
5255
import org.opensearch.common.settings.Settings;
@@ -72,6 +75,8 @@
7275
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE;
7376
import static org.hamcrest.Matchers.contains;
7477
import static org.hamcrest.Matchers.equalTo;
78+
import static org.mockito.Mockito.mock;
79+
import static org.mockito.Mockito.when;
7580

7681
public class DiskThresholdMonitorTests extends OpenSearchAllocationTestCase {
7782

@@ -116,9 +121,7 @@ public void testMarkFloodStageIndicesReadOnly() {
116121
AtomicReference<Set<String>> indices = new AtomicReference<>();
117122
AtomicLong currentTime = new AtomicLong();
118123
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
119-
Settings.EMPTY,
120-
() -> clusterState,
121-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
124+
createClusterService(Settings.EMPTY, clusterState),
122125
null,
123126
currentTime::get,
124127
(reason, priority, listener) -> {
@@ -178,9 +181,7 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
178181
assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
179182

180183
monitor = new DiskThresholdMonitor(
181-
Settings.EMPTY,
182-
() -> anotherFinalClusterState,
183-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
184+
createClusterService(Settings.EMPTY, anotherFinalClusterState),
184185
null,
185186
currentTime::get,
186187
(reason, priority, listener) -> {
@@ -219,9 +220,7 @@ public void testDoesNotSubmitRerouteTaskTooFrequently() {
219220
AtomicLong currentTime = new AtomicLong();
220221
AtomicReference<ActionListener<ClusterState>> listenerReference = new AtomicReference<>();
221222
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
222-
Settings.EMPTY,
223-
() -> clusterState,
224-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
223+
createClusterService(Settings.EMPTY, clusterState),
225224
null,
226225
currentTime::get,
227226
(reason, priority, listener) -> {
@@ -360,9 +359,7 @@ public void testAutoReleaseIndices() {
360359
);
361360

362361
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
363-
Settings.EMPTY,
364-
() -> clusterState,
365-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
362+
createClusterService(Settings.EMPTY, clusterState),
366363
null,
367364
() -> 0L,
368365
(reason, priority, listener) -> {
@@ -422,9 +419,7 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
422419

423420
assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
424421
monitor = new DiskThresholdMonitor(
425-
Settings.EMPTY,
426-
() -> clusterStateWithBlocks,
427-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
422+
createClusterService(Settings.EMPTY, clusterStateWithBlocks),
428423
null,
429424
() -> 0L,
430425
(reason, priority, listener) -> {
@@ -539,9 +534,7 @@ public long getAsLong() {
539534
final AtomicLong relocatingShardSizeRef = new AtomicLong();
540535

541536
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
542-
Settings.EMPTY,
543-
clusterStateRef::get,
544-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
537+
createClusterService(Settings.EMPTY, clusterState),
545538
null,
546539
timeSupplier,
547540
(reason, priority, listener) -> listener.onResponse(clusterStateRef.get())
@@ -687,9 +680,7 @@ public void testIndexCreateBlockWhenNoDataNodeHealthy() {
687680
AtomicLong currentTime = new AtomicLong();
688681
Settings settings = Settings.builder().build();
689682
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
690-
settings,
691-
() -> clusterState,
692-
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
683+
createClusterService(settings, clusterState),
693684
null,
694685
currentTime::get,
695686
(reason, priority, listener) -> {
@@ -766,9 +757,7 @@ public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark() {
766757
AtomicLong currentTime = new AtomicLong();
767758
Settings settings = Settings.builder().put(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), true).build();
768759
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
769-
settings,
770-
() -> clusterState,
771-
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
760+
createClusterService(settings, clusterState),
772761
null,
773762
currentTime::get,
774763
(reason, priority, listener) -> {
@@ -905,4 +894,10 @@ private static ClusterInfo clusterInfo(
905894
return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of());
906895
}
907896

897+
private static ClusterService createClusterService(Settings settings, ClusterState clusterState) {
898+
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
899+
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
900+
when(clusterApplierService.state()).thenReturn(clusterState);
901+
return new ClusterService(settings, clusterSettings, mock(ClusterManagerService.class), clusterApplierService);
902+
}
908903
}

0 commit comments

Comments
 (0)