Skip to content

Commit 4f29d7e

Browse files
dhwanilpatelshiv0408
authored andcommitted
Use async write for manifest file and use latch for timeout (opensearch-project#10968)
* Use async write for manifest file and use latch for timeout Signed-off-by: Dhwanil Patel <dhwanip@amazon.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent b0342ef commit 4f29d7e

File tree

3 files changed

+137
-39
lines changed

3 files changed

+137
-39
lines changed

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,7 @@ public void apply(Settings value, Settings current, Settings previous) {
686686
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
687687
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
688688
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
689+
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
689690
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
690691
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
691692
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public class RemoteClusterStateService implements Closeable {
8787

8888
public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);
8989

90+
public static final TimeValue METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);
91+
9092
public static final Setting<TimeValue> INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
9193
"cluster.remote_store.state.index_metadata.upload_timeout",
9294
INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
@@ -101,6 +103,13 @@ public class RemoteClusterStateService implements Closeable {
101103
Setting.Property.NodeScope
102104
);
103105

106+
public static final Setting<TimeValue> METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
107+
"cluster.remote_store.state.metadata_manifest.upload_timeout",
108+
METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT,
109+
Setting.Property.Dynamic,
110+
Setting.Property.NodeScope
111+
);
112+
104113
public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
105114
"index-metadata",
106115
METADATA_NAME_FORMAT,
@@ -157,6 +166,7 @@ public class RemoteClusterStateService implements Closeable {
157166

158167
private volatile TimeValue indexMetadataUploadTimeout;
159168
private volatile TimeValue globalMetadataUploadTimeout;
169+
private volatile TimeValue metadataManifestUploadTimeout;
160170

161171
private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
162172
private final RemotePersistenceStats remoteStateStats;
@@ -190,9 +200,11 @@ public RemoteClusterStateService(
190200
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
191201
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
192202
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
203+
this.metadataManifestUploadTimeout = clusterSettings.get(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING);
193204
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
194205
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
195206
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
207+
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
196208
this.remoteStateStats = new RemotePersistenceStats();
197209
}
198210

@@ -401,21 +413,21 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
401413
try {
402414
if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
403415
// TODO: We should add metrics where transfer is timing out. [Issue: #10687]
404-
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
416+
RemoteStateTransferException ex = new RemoteStateTransferException(
405417
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")
406418
);
407419
throw ex;
408420
}
409421
} catch (InterruptedException ex) {
410-
GlobalMetadataTransferException exception = new GlobalMetadataTransferException(
422+
RemoteStateTransferException exception = new RemoteStateTransferException(
411423
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete - %s"),
412424
ex
413425
);
414426
Thread.currentThread().interrupt();
415427
throw exception;
416428
}
417429
if (exceptionReference.get() != null) {
418-
throw new GlobalMetadataTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
430+
throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
419431
}
420432
return result.get();
421433
}
@@ -440,7 +452,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
440452
);
441453
result.add(uploadedIndexMetadata);
442454
}, ex -> {
443-
assert ex instanceof IndexMetadataTransferException;
455+
assert ex instanceof RemoteStateTransferException;
444456
logger.error(
445457
() -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()),
446458
ex
@@ -457,7 +469,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
457469

458470
try {
459471
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
460-
IndexMetadataTransferException ex = new IndexMetadataTransferException(
472+
RemoteStateTransferException ex = new RemoteStateTransferException(
461473
String.format(
462474
Locale.ROOT,
463475
"Timed out waiting for transfer of index metadata to complete - %s",
@@ -469,7 +481,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
469481
}
470482
} catch (InterruptedException ex) {
471483
exceptionList.forEach(ex::addSuppressed);
472-
IndexMetadataTransferException exception = new IndexMetadataTransferException(
484+
RemoteStateTransferException exception = new RemoteStateTransferException(
473485
String.format(
474486
Locale.ROOT,
475487
"Timed out waiting for transfer of index metadata to complete - %s",
@@ -481,7 +493,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
481493
throw exception;
482494
}
483495
if (exceptionList.size() > 0) {
484-
IndexMetadataTransferException exception = new IndexMetadataTransferException(
496+
RemoteStateTransferException exception = new RemoteStateTransferException(
485497
String.format(
486498
Locale.ROOT,
487499
"Exception during transfer of IndexMetadata to Remote %s",
@@ -520,7 +532,7 @@ private void writeIndexMetadataAsync(
520532
indexMetadataContainer.path().buildAsString() + indexMetadataFilename
521533
)
522534
),
523-
ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex))
535+
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().toString(), ex))
524536
);
525537

526538
INDEX_METADATA_FORMAT.writeAsyncWithUrgentPriority(
@@ -601,14 +613,45 @@ private ClusterMetadataManifest uploadManifest(
601613

602614
private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
603615
throws IOException {
616+
AtomicReference<String> result = new AtomicReference<String>();
617+
AtomicReference<Exception> exceptionReference = new AtomicReference<Exception>();
618+
604619
final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID);
605-
CLUSTER_METADATA_MANIFEST_FORMAT.write(
620+
621+
// latch to wait until upload is not finished
622+
CountDownLatch latch = new CountDownLatch(1);
623+
624+
LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> {
625+
logger.trace(String.format(Locale.ROOT, "Manifest file uploaded successfully."));
626+
}, ex -> { exceptionReference.set(ex); }), latch);
627+
628+
CLUSTER_METADATA_MANIFEST_FORMAT.writeAsyncWithUrgentPriority(
606629
uploadManifest,
607630
metadataManifestContainer,
608631
fileName,
609632
blobStoreRepository.getCompressor(),
633+
completionListener,
610634
FORMAT_PARAMS
611635
);
636+
637+
try {
638+
if (latch.await(getMetadataManifestUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
639+
RemoteStateTransferException ex = new RemoteStateTransferException(
640+
String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete")
641+
);
642+
throw ex;
643+
}
644+
} catch (InterruptedException ex) {
645+
RemoteStateTransferException exception = new RemoteStateTransferException(
646+
String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete - %s"),
647+
ex
648+
);
649+
Thread.currentThread().interrupt();
650+
throw exception;
651+
}
652+
if (exceptionReference.get() != null) {
653+
throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get());
654+
}
612655
logger.debug(
613656
"Metadata manifest file [{}] written during [{}] phase. ",
614657
fileName,
@@ -668,6 +711,10 @@ private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTim
668711
this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout;
669712
}
670713

714+
private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploadTimeout) {
715+
this.metadataManifestUploadTimeout = newMetadataManifestUploadTimeout;
716+
}
717+
671718
public TimeValue getIndexMetadataUploadTimeout() {
672719
return this.indexMetadataUploadTimeout;
673720
}
@@ -676,6 +723,10 @@ public TimeValue getGlobalMetadataUploadTimeout() {
676723
return this.globalMetadataUploadTimeout;
677724
}
678725

726+
public TimeValue getMetadataManifestUploadTimeout() {
727+
return this.metadataManifestUploadTimeout;
728+
}
729+
679730
static String getManifestFileName(long term, long version, boolean committed) {
680731
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
681732
return String.join(
@@ -1088,29 +1139,15 @@ public void writeMetadataFailed() {
10881139
}
10891140

10901141
/**
1091-
* Exception for IndexMetadata transfer failures to remote
1092-
*/
1093-
static class IndexMetadataTransferException extends RuntimeException {
1094-
1095-
public IndexMetadataTransferException(String errorDesc) {
1096-
super(errorDesc);
1097-
}
1098-
1099-
public IndexMetadataTransferException(String errorDesc, Throwable cause) {
1100-
super(errorDesc, cause);
1101-
}
1102-
}
1103-
1104-
/**
1105-
* Exception for GlobalMetadata transfer failures to remote
1142+
* Exception for Remote state transfer.
11061143
*/
1107-
static class GlobalMetadataTransferException extends RuntimeException {
1144+
static class RemoteStateTransferException extends RuntimeException {
11081145

1109-
public GlobalMetadataTransferException(String errorDesc) {
1146+
public RemoteStateTransferException(String errorDesc) {
11101147
super(errorDesc);
11111148
}
11121149

1113-
public GlobalMetadataTransferException(String errorDesc, Throwable cause) {
1150+
public RemoteStateTransferException(String errorDesc, Throwable cause) {
11141151
super(errorDesc, cause);
11151152
}
11161153
}

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.concurrent.CountDownLatch;
6969
import java.util.concurrent.TimeUnit;
7070
import java.util.concurrent.atomic.AtomicInteger;
71+
import java.util.concurrent.atomic.AtomicReference;
7172
import java.util.function.Function;
7273
import java.util.function.Supplier;
7374
import java.util.stream.Stream;
@@ -230,10 +231,17 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {
230231

231232
ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);
232233
ArgumentCaptor<WriteContext> writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class);
233-
234+
AtomicReference<WriteContext> capturedWriteContext = new AtomicReference<>();
234235
doAnswer((i) -> {
235236
actionListenerArgumentCaptor.getValue().onResponse(null);
236237
return null;
238+
}).doAnswer((i) -> {
239+
actionListenerArgumentCaptor.getValue().onResponse(null);
240+
capturedWriteContext.set(writeContextArgumentCaptor.getValue());
241+
return null;
242+
}).doAnswer((i) -> {
243+
actionListenerArgumentCaptor.getValue().onResponse(null);
244+
return null;
237245
}).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture());
238246

239247
remoteClusterStateService.start();
@@ -262,27 +270,30 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException {
262270
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
263271
assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID()));
264272

265-
assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 2);
266-
assertEquals(writeContextArgumentCaptor.getAllValues().size(), 2);
273+
assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 3);
274+
assertEquals(writeContextArgumentCaptor.getAllValues().size(), 3);
267275

268-
WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue();
269-
byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes();
276+
byte[] writtenBytes = capturedWriteContext.get()
277+
.getStreamProvider(Integer.MAX_VALUE)
278+
.provideStream(0)
279+
.getInputStream()
280+
.readAllBytes();
270281
IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize(
271-
capturedWriteContext.getFileName(),
282+
capturedWriteContext.get().getFileName(),
272283
blobStoreRepository.getNamedXContentRegistry(),
273284
new BytesArray(writtenBytes)
274285
);
275286

276-
assertEquals(capturedWriteContext.getWritePriority(), WritePriority.URGENT);
287+
assertEquals(capturedWriteContext.get().getWritePriority(), WritePriority.URGENT);
277288
assertEquals(writtenIndexMetadata.getNumberOfShards(), 1);
278289
assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0);
279290
assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index");
280291
assertEquals(writtenIndexMetadata.getIndex().getUUID(), "index-uuid");
281292
long expectedChecksum = RemoteTransferContainer.checksumOfChecksum(new ByteArrayIndexInput("metadata-filename", writtenBytes), 8);
282-
if (capturedWriteContext.doRemoteDataIntegrityCheck()) {
283-
assertEquals(capturedWriteContext.getExpectedChecksum().longValue(), expectedChecksum);
293+
if (capturedWriteContext.get().doRemoteDataIntegrityCheck()) {
294+
assertEquals(capturedWriteContext.get().getExpectedChecksum().longValue(), expectedChecksum);
284295
} else {
285-
assertEquals(capturedWriteContext.getExpectedChecksum(), null);
296+
assertEquals(capturedWriteContext.get().getExpectedChecksum(), null);
286297
}
287298

288299
}
@@ -306,11 +317,44 @@ public void run() {
306317

307318
remoteClusterStateService.start();
308319
assertThrows(
309-
RemoteClusterStateService.GlobalMetadataTransferException.class,
320+
RemoteClusterStateService.RemoteStateTransferException.class,
310321
() -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10))
311322
);
312323
}
313324

325+
public void testTimeoutWhileWritingManifestFile() throws IOException {
326+
// verify update metadata manifest upload timeout
327+
int metadataManifestUploadTimeout = 2;
328+
Settings newSettings = Settings.builder()
329+
.put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s")
330+
.build();
331+
clusterSettings.applySettings(newSettings);
332+
333+
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
334+
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);
335+
336+
ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);
337+
338+
doAnswer((i) -> { // For Global Metadata
339+
actionListenerArgumentCaptor.getValue().onResponse(null);
340+
return null;
341+
}).doAnswer((i) -> { // For Index Metadata
342+
actionListenerArgumentCaptor.getValue().onResponse(null);
343+
return null;
344+
}).doAnswer((i) -> {
345+
// For Manifest file perform No Op, so latch in code will timeout
346+
return null;
347+
}).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture());
348+
349+
remoteClusterStateService.start();
350+
try {
351+
remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10));
352+
} catch (Exception e) {
353+
assertTrue(e instanceof RemoteClusterStateService.RemoteStateTransferException);
354+
assertTrue(e.getMessage().contains("Timed out waiting for transfer of manifest file to complete"));
355+
}
356+
}
357+
314358
public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException {
315359
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
316360
AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class);
@@ -327,7 +371,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx
327371

328372
remoteClusterStateService.start();
329373
assertThrows(
330-
RemoteClusterStateService.IndexMetadataTransferException.class,
374+
RemoteClusterStateService.RemoteStateTransferException.class,
331375
() -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10))
332376
);
333377
assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
@@ -1142,6 +1186,22 @@ public void testIndexMetadataUploadWaitTimeSetting() {
11421186
assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds());
11431187
}
11441188

1189+
public void testMetadataManifestUploadWaitTimeSetting() {
1190+
// verify default value
1191+
assertEquals(
1192+
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT,
1193+
remoteClusterStateService.getMetadataManifestUploadTimeout()
1194+
);
1195+
1196+
// verify update metadata manifest upload timeout
1197+
int metadataManifestUploadTimeout = randomIntBetween(1, 10);
1198+
Settings newSettings = Settings.builder()
1199+
.put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s")
1200+
.build();
1201+
clusterSettings.applySettings(newSettings);
1202+
assertEquals(metadataManifestUploadTimeout, remoteClusterStateService.getMetadataManifestUploadTimeout().seconds());
1203+
}
1204+
11451205
public void testGlobalMetadataUploadWaitTimeSetting() {
11461206
// verify default value
11471207
assertEquals(

0 commit comments

Comments
 (0)