Skip to content

Commit a03bb2c

Browse files
varunbharadwajtandonks
authored andcommitted
[Pull-based Ingestion] Support cluster write blocks (opensearch-project#18280)
* Support cluster write blocks in pull-based ingestion Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com> * Update poller from cluster state applier to listener Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com> * Add unit tests Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com> --------- Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent d1df5cd commit a03bb2c

File tree

21 files changed

+293
-46
lines changed

21 files changed

+293
-46
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2323
- Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256))
2424
- Support create mode in pull-based ingestion and add retries for transient failures ([#18250](https://github.com/opensearch-project/OpenSearch/pull/18250)))
2525
- Decouple the init of Crypto Plugin and KeyProvider in CryptoRegistry ([18270](https://github.com/opensearch-project/OpenSearch/pull18270)))
26+
- Support cluster write block in pull-based ingestion ([#18280](https://github.com/opensearch-project/OpenSearch/pull/18280)))
2627
- Supporting Scripted Metric Aggregation when reducing aggregations in InternalValueCount and InternalAvg ([18288](https://github.com/opensearch-project/OpenSearch/pull18270)))
2728

2829
### Changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,4 +204,12 @@ protected void recreateKafkaTopics(int numKafkaPartitions) {
204204
cleanup();
205205
setupKafka(numKafkaPartitions);
206206
}
207+
208+
protected void setWriteBlock(String indexName, boolean isWriteBlockEnabled) {
209+
client().admin()
210+
.indices()
211+
.prepareUpdateSettings(indexName)
212+
.setSettings(Settings.builder().put("index.blocks.write", isWriteBlockEnabled))
213+
.get();
214+
}
207215
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,55 @@ public void testExternalVersioningWithDisabledGCDeletes() throws Exception {
449449

450450
}
451451

452+
public void testClusterWriteBlock() throws Exception {
453+
// setup nodes and index
454+
produceData("1", "name1", "24");
455+
produceData("2", "name2", "20");
456+
internalCluster().startClusterManagerOnlyNode();
457+
final String nodeA = internalCluster().startDataOnlyNode();
458+
final String nodeB = internalCluster().startDataOnlyNode();
459+
460+
createIndexWithDefaultSettings(1, 1);
461+
ensureGreen(indexName);
462+
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
463+
464+
// create a write block
465+
setWriteBlock(indexName, true);
466+
waitForState(() -> {
467+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
468+
return ingestionState.getFailedShards() == 0
469+
&& Arrays.stream(ingestionState.getShardStates())
470+
.allMatch(state -> state.isWriteBlockEnabled() && state.pollerState().equalsIgnoreCase("paused"));
471+
});
472+
473+
// verify write block state in poller is persisted
474+
produceData("3", "name3", "30");
475+
produceData("4", "name4", "31");
476+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
477+
ensureYellowAndNoInitializingShards(indexName);
478+
assertTrue(nodeB.equals(primaryNodeName(indexName)));
479+
480+
final String nodeC = internalCluster().startDataOnlyNode();
481+
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
482+
ensureGreen(indexName);
483+
assertTrue(nodeC.equals(replicaNodeName(indexName)));
484+
waitForState(() -> {
485+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
486+
return Arrays.stream(ingestionState.getShardStates())
487+
.allMatch(state -> state.isWriteBlockEnabled() && state.pollerState().equalsIgnoreCase("paused"));
488+
});
489+
assertEquals(2, getSearchableDocCount(nodeB));
490+
491+
// remove write block
492+
setWriteBlock(indexName, false);
493+
waitForState(() -> {
494+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
495+
return ingestionState.getFailedShards() == 0
496+
&& Arrays.stream(ingestionState.getShardStates()).allMatch(state -> state.isWriteBlockEnabled() == false);
497+
});
498+
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
499+
}
500+
452501
private void verifyRemoteStoreEnabled(String node) {
453502
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
454503
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
662662
wrapper,
663663
getInstanceFromNode(CircuitBreakerService.class),
664664
env.nodeId(),
665+
getInstanceFromNode(ClusterService.class),
665666
listener
666667
);
667668
shardRef.set(newShard);
@@ -688,6 +689,7 @@ public static final IndexShard newIndexShard(
688689
CheckedFunction<DirectoryReader, DirectoryReader, IOException> wrapper,
689690
final CircuitBreakerService cbs,
690691
final String nodeId,
692+
final ClusterService clusterService,
691693
final IndexingOperationListener... listeners
692694
) throws IOException {
693695
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
@@ -726,7 +728,8 @@ public static final IndexShard newIndexShard(
726728
false,
727729
OpenSearchTestCase::randomBoolean,
728730
() -> indexService.getIndexSettings().getRefreshInterval(),
729-
indexService.getRefreshMutex()
731+
indexService.getRefreshMutex(),
732+
clusterService.getClusterApplierService()
730733
);
731734
}
732735

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,36 +28,37 @@
2828
* @opensearch.experimental
2929
*/
3030
@ExperimentalApi
31-
public record ShardIngestionState(String index, int shardId, String pollerState, String errorPolicy, boolean isPollerPaused)
32-
implements
33-
Writeable,
34-
ToXContentFragment {
31+
public record ShardIngestionState(String index, int shardId, String pollerState, String errorPolicy, boolean isPollerPaused,
32+
boolean isWriteBlockEnabled) implements Writeable, ToXContentFragment {
3533

3634
private static final String SHARD = "shard";
3735
private static final String POLLER_STATE = "poller_state";
3836
private static final String ERROR_POLICY = "error_policy";
3937
private static final String POLLER_PAUSED = "poller_paused";
38+
private static final String WRITE_BLOCK_ENABLED = "write_block_enabled";
4039

4140
public ShardIngestionState() {
42-
this("", -1, "", "", false);
41+
this("", -1, "", "", false, false);
4342
}
4443

4544
public ShardIngestionState(StreamInput in) throws IOException {
46-
this(in.readString(), in.readVInt(), in.readOptionalString(), in.readOptionalString(), in.readBoolean());
45+
this(in.readString(), in.readVInt(), in.readOptionalString(), in.readOptionalString(), in.readBoolean(), in.readBoolean());
4746
}
4847

4948
public ShardIngestionState(
5049
String index,
5150
int shardId,
5251
@Nullable String pollerState,
5352
@Nullable String errorPolicy,
54-
boolean isPollerPaused
53+
boolean isPollerPaused,
54+
boolean isWriteBlockEnabled
5555
) {
5656
this.index = index;
5757
this.shardId = shardId;
5858
this.pollerState = pollerState;
5959
this.errorPolicy = errorPolicy;
6060
this.isPollerPaused = isPollerPaused;
61+
this.isWriteBlockEnabled = isWriteBlockEnabled;
6162
}
6263

6364
@Override
@@ -67,6 +68,7 @@ public void writeTo(StreamOutput out) throws IOException {
6768
out.writeOptionalString(pollerState);
6869
out.writeOptionalString(errorPolicy);
6970
out.writeBoolean(isPollerPaused);
71+
out.writeBoolean(isWriteBlockEnabled);
7072
}
7173

7274
@Override
@@ -76,6 +78,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
7678
builder.field(POLLER_STATE, pollerState);
7779
builder.field(ERROR_POLICY, errorPolicy);
7880
builder.field(POLLER_PAUSED, isPollerPaused);
81+
builder.field(WRITE_BLOCK_ENABLED, isWriteBlockEnabled);
7982
builder.endObject();
8083
return builder;
8184
}

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,8 @@ protected void closeInternal() {
727727
shardLevelRefreshEnabled,
728728
fixedRefreshIntervalSchedulingEnabled,
729729
this::getRefreshInterval,
730-
refreshMutex
730+
refreshMutex,
731+
clusterService.getClusterApplierService()
731732
);
732733
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
733734
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/opensearch/index/engine/EngineConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.lucene.search.ReferenceManager;
4242
import org.apache.lucene.search.Sort;
4343
import org.apache.lucene.search.similarities.Similarity;
44+
import org.opensearch.cluster.service.ClusterApplierService;
4445
import org.opensearch.common.Nullable;
4546
import org.opensearch.common.annotation.PublicApi;
4647
import org.opensearch.common.settings.Setting;
@@ -113,6 +114,7 @@ public final class EngineConfig {
113114
private final BooleanSupplier startedPrimarySupplier;
114115
private final Comparator<LeafReader> leafSorter;
115116
private final Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
117+
private final ClusterApplierService clusterApplierService;
116118

117119
/**
118120
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -303,6 +305,7 @@ private EngineConfig(Builder builder) {
303305
this.leafSorter = builder.leafSorter;
304306
this.documentMapperForTypeSupplier = builder.documentMapperForTypeSupplier;
305307
this.indexReaderWarmer = builder.indexReaderWarmer;
308+
this.clusterApplierService = builder.clusterApplierService;
306309
}
307310

308311
/**
@@ -576,6 +579,13 @@ public Comparator<LeafReader> getLeafSorter() {
576579
return this.leafSorter;
577580
}
578581

582+
/**
583+
* Returns the ClusterApplierService instance.
584+
*/
585+
public ClusterApplierService getClusterApplierService() {
586+
return this.clusterApplierService;
587+
}
588+
579589
/**
580590
* Builder for EngineConfig class
581591
*
@@ -611,6 +621,7 @@ public static class Builder {
611621
private Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
612622
Comparator<LeafReader> leafSorter;
613623
private IndexWriter.IndexReaderWarmer indexReaderWarmer;
624+
private ClusterApplierService clusterApplierService;
614625

615626
public Builder shardId(ShardId shardId) {
616627
this.shardId = shardId;
@@ -757,6 +768,11 @@ public Builder indexReaderWarmer(IndexWriter.IndexReaderWarmer indexReaderWarmer
757768
return this;
758769
}
759770

771+
public Builder clusterApplierService(ClusterApplierService clusterApplierService) {
772+
this.clusterApplierService = clusterApplierService;
773+
return this;
774+
}
775+
760776
public EngineConfig build() {
761777
return new EngineConfig(this);
762778
}

server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.apache.lucene.search.ReferenceManager;
1919
import org.apache.lucene.search.Sort;
2020
import org.apache.lucene.search.similarities.Similarity;
21+
import org.opensearch.cluster.service.ClusterApplierService;
2122
import org.opensearch.common.Nullable;
2223
import org.opensearch.common.unit.TimeValue;
2324
import org.opensearch.core.index.shard.ShardId;
@@ -158,7 +159,8 @@ public EngineConfig newEngineConfig(
158159
TranslogFactory translogFactory,
159160
Comparator<LeafReader> leafSorter,
160161
Supplier<DocumentMapperForType> documentMapperForTypeSupplier,
161-
IndexWriter.IndexReaderWarmer indexReaderWarmer
162+
IndexWriter.IndexReaderWarmer indexReaderWarmer,
163+
ClusterApplierService clusterApplierService
162164
) {
163165
CodecService codecServiceToUse = codecService;
164166
if (codecService == null && this.codecServiceFactory != null) {
@@ -194,6 +196,7 @@ public EngineConfig newEngineConfig(
194196
.leafSorter(leafSorter)
195197
.documentMapperForTypeSupplier(documentMapperForTypeSupplier)
196198
.indexReaderWarmer(indexReaderWarmer)
199+
.clusterApplierService(clusterApplierService)
197200
.build();
198201
}
199202

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.lucene.search.IndexSearcher;
1515
import org.opensearch.ExceptionsHelper;
1616
import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState;
17+
import org.opensearch.cluster.block.ClusterBlockLevel;
1718
import org.opensearch.cluster.metadata.IndexMetadata;
1819
import org.opensearch.cluster.metadata.IngestionSource;
1920
import org.opensearch.common.lease.Releasable;
@@ -132,6 +133,19 @@ public void start() {
132133
ingestionSource.getNumProcessorThreads(),
133134
ingestionSource.getBlockingQueueSize()
134135
);
136+
137+
// Register the poller with the ClusterService for receiving cluster state updates.
138+
// Also initialize cluster write block state in the poller.
139+
if (engineConfig.getClusterApplierService() != null) {
140+
engineConfig.getClusterApplierService().addListener(streamPoller);
141+
boolean isWriteBlockEnabled = engineConfig.getClusterApplierService()
142+
.state()
143+
.blocks()
144+
.indexBlocked(ClusterBlockLevel.WRITE, engineConfig.getIndexSettings().getIndex().getName());
145+
streamPoller.setWriteBlockEnabled(isWriteBlockEnabled);
146+
}
147+
148+
// start the polling loop
135149
streamPoller.start();
136150
}
137151

@@ -512,7 +526,9 @@ public ShardIngestionState getIngestionState() {
512526
engineConfig.getShardId().getId(),
513527
streamPoller.getState().toString(),
514528
streamPoller.getErrorStrategy().getName(),
515-
streamPoller.isPaused()
529+
streamPoller.isPaused(),
530+
streamPoller.isWriteBlockEnabled()
531+
516532
);
517533
}
518534
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
7777
import org.opensearch.cluster.routing.ShardRouting;
7878
import org.opensearch.cluster.routing.ShardRoutingState;
79+
import org.opensearch.cluster.service.ClusterApplierService;
7980
import org.opensearch.common.Booleans;
8081
import org.opensearch.common.CheckedConsumer;
8182
import org.opensearch.common.CheckedFunction;
@@ -375,6 +376,7 @@ Runnable getGlobalCheckpointSyncer() {
375376
private final Supplier<TimeValue> refreshInterval;
376377
private final Object refreshMutex;
377378
private volatile AsyncShardRefreshTask refreshTask;
379+
private final ClusterApplierService clusterApplierService;
378380

379381
public IndexShard(
380382
final ShardRouting shardRouting,
@@ -411,7 +413,8 @@ public IndexShard(
411413
final boolean shardLevelRefreshEnabled,
412414
final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
413415
final Supplier<TimeValue> refreshInterval,
414-
final Object refreshMutex
416+
final Object refreshMutex,
417+
final ClusterApplierService clusterApplierService
415418
) throws IOException {
416419
super(shardRouting.shardId(), indexSettings);
417420
assert shardRouting.initializing();
@@ -518,6 +521,7 @@ public boolean shouldCache(Query query) {
518521
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
519522
this.refreshInterval = refreshInterval;
520523
this.refreshMutex = Objects.requireNonNull(refreshMutex);
524+
this.clusterApplierService = clusterApplierService;
521525
synchronized (this.refreshMutex) {
522526
if (shardLevelRefreshEnabled) {
523527
startRefreshTask();
@@ -4127,7 +4131,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
41274131
isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null, // DESC @timestamp default order for
41284132
// timeseries
41294133
() -> docMapper(),
4130-
mergedSegmentWarmerFactory.get(this)
4134+
mergedSegmentWarmerFactory.get(this),
4135+
clusterApplierService
41314136
);
41324137
}
41334138

0 commit comments

Comments
 (0)