Skip to content

Commit 630da1c

Browse files
[Remote Store] Make translog transfer timeout configurable using dynamic setting (#12704) (#13425)
Signed-off-by: Sachin Kale <kalsac@amazon.com> Signed-off-by: Ashish Singh <ssashish@amazon.com> Co-authored-by: Sachin Kale <sachinpkale@gmail.com>
1 parent 4b3b232 commit 630da1c

File tree

13 files changed

+147
-84
lines changed

13 files changed

+147
-84
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- [Tiered Caching] Make took time caching policy setting dynamic ([#13063](https://github.com/opensearch-project/OpenSearch/pull/13063))
1717
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
1818
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
19+
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))
1920
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))
2021
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
2122
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,8 @@ public void apply(Settings value, Settings current, Settings previous) {
733733
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
734734

735735
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
736-
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
736+
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
737+
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING
737738
)
738739
)
739740
);

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4988,7 +4988,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
49884988
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
49894989
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
49904990
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
4991-
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy());
4991+
RemoteFsTranslog.cleanup(repository, shardId, getThreadPool(), indexSettings.getRemoteStorePathStrategy(), remoteStoreSettings);
49924992
}
49934993

49944994
/*
@@ -5011,6 +5011,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
50115011
getThreadPool(),
50125012
shardPath().resolveTranslog(),
50135013
indexSettings.getRemoteStorePathStrategy(),
5014+
remoteStoreSettings,
50145015
logger
50155016
);
50165017
}

server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.index.translog;
1010

1111
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
12+
import org.opensearch.indices.RemoteStoreSettings;
1213
import org.opensearch.repositories.RepositoriesService;
1314
import org.opensearch.repositories.Repository;
1415
import org.opensearch.repositories.RepositoryMissingException;
@@ -34,11 +35,14 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {
3435

3536
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
3637

38+
private final RemoteStoreSettings remoteStoreSettings;
39+
3740
public RemoteBlobStoreInternalTranslogFactory(
3841
Supplier<RepositoriesService> repositoriesServiceSupplier,
3942
ThreadPool threadPool,
4043
String repositoryName,
41-
RemoteTranslogTransferTracker remoteTranslogTransferTracker
44+
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
45+
RemoteStoreSettings remoteStoreSettings
4246
) {
4347
Repository repository;
4448
try {
@@ -49,6 +53,7 @@ public RemoteBlobStoreInternalTranslogFactory(
4953
this.repository = repository;
5054
this.threadPool = threadPool;
5155
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
56+
this.remoteStoreSettings = remoteStoreSettings;
5257
}
5358

5459
@Override
@@ -74,7 +79,8 @@ public Translog newTranslog(
7479
blobStoreRepository,
7580
threadPool,
7681
startedPrimarySupplier,
77-
remoteTranslogTransferTracker
82+
remoteTranslogTransferTracker,
83+
remoteStoreSettings
7884
);
7985
}
8086

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.index.translog.transfer.TranslogTransferManager;
3030
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
3131
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
32+
import org.opensearch.indices.RemoteStoreSettings;
3233
import org.opensearch.repositories.Repository;
3334
import org.opensearch.repositories.blobstore.BlobStoreRepository;
3435
import org.opensearch.threadpool.ThreadPool;
@@ -101,7 +102,8 @@ public RemoteFsTranslog(
101102
BlobStoreRepository blobStoreRepository,
102103
ThreadPool threadPool,
103104
BooleanSupplier startedPrimarySupplier,
104-
RemoteTranslogTransferTracker remoteTranslogTransferTracker
105+
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
106+
RemoteStoreSettings remoteStoreSettings
105107
) throws IOException {
106108
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
107109
logger = Loggers.getLogger(getClass(), shardId);
@@ -114,7 +116,8 @@ public RemoteFsTranslog(
114116
shardId,
115117
fileTransferTracker,
116118
remoteTranslogTransferTracker,
117-
indexSettings().getRemoteStorePathStrategy()
119+
indexSettings().getRemoteStorePathStrategy(),
120+
remoteStoreSettings
118121
);
119122
try {
120123
download(translogTransferManager, location, logger);
@@ -164,6 +167,7 @@ public static void download(
164167
ThreadPool threadPool,
165168
Path location,
166169
RemoteStorePathStrategy pathStrategy,
170+
RemoteStoreSettings remoteStoreSettings,
167171
Logger logger
168172
) throws IOException {
169173
assert repository instanceof BlobStoreRepository : String.format(
@@ -182,7 +186,8 @@ public static void download(
182186
shardId,
183187
fileTransferTracker,
184188
remoteTranslogTransferTracker,
185-
pathStrategy
189+
pathStrategy,
190+
remoteStoreSettings
186191
);
187192
RemoteFsTranslog.download(translogTransferManager, location, logger);
188193
logger.trace(remoteTranslogTransferTracker.toString());
@@ -282,7 +287,8 @@ public static TranslogTransferManager buildTranslogTransferManager(
282287
ShardId shardId,
283288
FileTransferTracker fileTransferTracker,
284289
RemoteTranslogTransferTracker tracker,
285-
RemoteStorePathStrategy pathStrategy
290+
RemoteStorePathStrategy pathStrategy,
291+
RemoteStoreSettings remoteStoreSettings
286292
) {
287293
assert Objects.nonNull(pathStrategy);
288294
String indexUUID = shardId.getIndex().getUUID();
@@ -304,7 +310,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
304310
.build();
305311
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
306312
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
307-
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker);
313+
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings);
308314
}
309315

310316
@Override
@@ -576,8 +582,13 @@ private void deleteStaleRemotePrimaryTerms() {
576582
}
577583
}
578584

579-
public static void cleanup(Repository repository, ShardId shardId, ThreadPool threadPool, RemoteStorePathStrategy pathStrategy)
580-
throws IOException {
585+
public static void cleanup(
586+
Repository repository,
587+
ShardId shardId,
588+
ThreadPool threadPool,
589+
RemoteStorePathStrategy pathStrategy,
590+
RemoteStoreSettings remoteStoreSettings
591+
) throws IOException {
581592
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
582593
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
583594
// We use a dummy stats tracker to ensure the flow doesn't break.
@@ -590,7 +601,8 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th
590601
shardId,
591602
fileTransferTracker,
592603
remoteTranslogTransferTracker,
593-
pathStrategy
604+
pathStrategy,
605+
remoteStoreSettings
594606
);
595607
// clean up all remote translog files
596608
translogTransferManager.deleteTranslogFiles();

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
2929
import org.opensearch.index.translog.Translog;
3030
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
31+
import org.opensearch.indices.RemoteStoreSettings;
3132
import org.opensearch.threadpool.ThreadPool;
3233

3334
import java.io.IOException;
@@ -60,9 +61,7 @@ public class TranslogTransferManager {
6061
private final BlobPath remoteMetadataTransferPath;
6162
private final FileTransferTracker fileTransferTracker;
6263
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
63-
64-
private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;
65-
64+
private final RemoteStoreSettings remoteStoreSettings;
6665
private static final int METADATA_FILES_TO_FETCH = 10;
6766

6867
private final Logger logger;
@@ -79,7 +78,8 @@ public TranslogTransferManager(
7978
BlobPath remoteDataTransferPath,
8079
BlobPath remoteMetadataTransferPath,
8180
FileTransferTracker fileTransferTracker,
82-
RemoteTranslogTransferTracker remoteTranslogTransferTracker
81+
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
82+
RemoteStoreSettings remoteStoreSettings
8383
) {
8484
this.shardId = shardId;
8585
this.transferService = transferService;
@@ -88,6 +88,7 @@ public TranslogTransferManager(
8888
this.fileTransferTracker = fileTransferTracker;
8989
this.logger = Loggers.getLogger(getClass(), shardId);
9090
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
91+
this.remoteStoreSettings = remoteStoreSettings;
9192
}
9293

9394
public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() {
@@ -151,7 +152,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
151152
transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH);
152153

153154
try {
154-
if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) {
155+
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
155156
Exception ex = new TranslogUploadFailedException(
156157
"Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"
157158
);

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,8 @@ protected void closeInternal() {
517517
repositoriesServiceSupplier,
518518
threadPool,
519519
remoteStoreStatsTrackerFactory,
520-
settings
520+
settings,
521+
remoteStoreSettings
521522
);
522523
this.searchRequestStats = searchRequestStats;
523524
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
@@ -547,22 +548,25 @@ private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTrans
547548
Supplier<RepositoriesService> repositoriesServiceSupplier,
548549
ThreadPool threadPool,
549550
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
550-
Settings settings
551+
Settings settings,
552+
RemoteStoreSettings remoteStoreSettings
551553
) {
552554
return (indexSettings, shardRouting) -> {
553555
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
554556
return new RemoteBlobStoreInternalTranslogFactory(
555557
repositoriesServiceSupplier,
556558
threadPool,
557559
indexSettings.getRemoteStoreTranslogRepository(),
558-
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
560+
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
561+
remoteStoreSettings
559562
);
560563
} else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) {
561564
return new RemoteBlobStoreInternalTranslogFactory(
562565
repositoriesServiceSupplier,
563566
threadPool,
564567
RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()),
565-
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId())
568+
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
569+
remoteStoreSettings
566570
);
567571
}
568572
return new InternalTranslogFactory();

server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,20 @@ public class RemoteStoreSettings {
5454
Property.Dynamic
5555
);
5656

57+
/**
58+
* Controls timeout value while uploading translog and checkpoint files to remote translog
59+
*/
60+
public static final Setting<TimeValue> CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
61+
"cluster.remote_store.translog.transfer_timeout",
62+
TimeValue.timeValueSeconds(30),
63+
TimeValue.timeValueSeconds(30),
64+
Property.NodeScope,
65+
Property.Dynamic
66+
);
67+
5768
private volatile TimeValue clusterRemoteTranslogBufferInterval;
5869
private volatile int minRemoteSegmentMetadataFiles;
70+
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
5971

6072
public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
6173
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
@@ -69,9 +81,14 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
6981
CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
7082
this::setMinRemoteSegmentMetadataFiles
7183
);
84+
85+
this.clusterRemoteTranslogTransferTimeout = CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING.get(settings);
86+
clusterSettings.addSettingsUpdateConsumer(
87+
CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
88+
this::setClusterRemoteTranslogTransferTimeout
89+
);
7290
}
7391

74-
// Exclusively for testing, please do not use it elsewhere.
7592
public TimeValue getClusterRemoteTranslogBufferInterval() {
7693
return clusterRemoteTranslogBufferInterval;
7794
}
@@ -87,4 +104,12 @@ private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles)
87104
public int getMinRemoteSegmentMetadataFiles() {
88105
return this.minRemoteSegmentMetadataFiles;
89106
}
107+
108+
public TimeValue getClusterRemoteTranslogTransferTimeout() {
109+
return clusterRemoteTranslogTransferTimeout;
110+
}
111+
112+
private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
113+
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
114+
}
90115
}

server/src/test/java/org/opensearch/index/IndexModuleTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,8 @@ private IndexService newIndexService(IndexModule module) throws IOException {
236236
repositoriesServiceReference::get,
237237
threadPool,
238238
indexSettings.getRemoteStoreTranslogRepository(),
239-
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10)
239+
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10),
240+
DefaultRemoteStoreSettings.INSTANCE
240241
);
241242
}
242243
return new InternalTranslogFactory();

server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.opensearch.index.translog.transfer.TranslogTransferManager;
5151
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
5252
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;
53+
import org.opensearch.indices.DefaultRemoteStoreSettings;
5354
import org.opensearch.indices.recovery.RecoverySettings;
5455
import org.opensearch.indices.replication.common.ReplicationType;
5556
import org.opensearch.repositories.blobstore.BlobStoreRepository;
@@ -188,7 +189,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
188189
repository,
189190
threadPool,
190191
primaryMode::get,
191-
new RemoteTranslogTransferTracker(shardId, 10)
192+
new RemoteTranslogTransferTracker(shardId, 10),
193+
DefaultRemoteStoreSettings.INSTANCE
192194
);
193195
}
194196

@@ -459,7 +461,8 @@ public void testExtraGenToKeep() throws Exception {
459461
repository,
460462
threadPool,
461463
() -> Boolean.TRUE,
462-
new RemoteTranslogTransferTracker(shardId, 10)
464+
new RemoteTranslogTransferTracker(shardId, 10),
465+
DefaultRemoteStoreSettings.INSTANCE
463466
) {
464467
@Override
465468
ChannelFactory getChannelFactory() {
@@ -1508,7 +1511,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
15081511
repository,
15091512
threadPool,
15101513
() -> Boolean.TRUE,
1511-
new RemoteTranslogTransferTracker(shardId, 10)
1514+
new RemoteTranslogTransferTracker(shardId, 10),
1515+
DefaultRemoteStoreSettings.INSTANCE
15121516
) {
15131517
@Override
15141518
ChannelFactory getChannelFactory() {
@@ -1616,7 +1620,8 @@ public void force(boolean metaData) throws IOException {
16161620
repository,
16171621
threadPool,
16181622
() -> Boolean.TRUE,
1619-
new RemoteTranslogTransferTracker(shardId, 10)
1623+
new RemoteTranslogTransferTracker(shardId, 10),
1624+
DefaultRemoteStoreSettings.INSTANCE
16201625
) {
16211626
@Override
16221627
ChannelFactory getChannelFactory() {

0 commit comments

Comments
 (0)