Skip to content

Commit 60fee58

Browse files
authored
Attempt to fix ThreadLeakControl issues in S3BlobContainerRetriesTests (#18201)
The Default aws client might create its own ScheduledExecutorService which is closed via ExecutorService.shutdown() when the client is closed. Calling shutdown might not ensure that all threads are gone when ThreadLeakControl is ran. Provide our own ScheduledExecutorService during this test so that it runs thrown our tearDown method which calls ThreadPool#terminate waiting for 5 seconds. Signed-off-by: David Causse <dcausse@wikimedia.org>
1 parent 3118fb6 commit 60fee58

File tree

5 files changed

+48
-16
lines changed

5 files changed

+48
-16
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.nio.file.Path;
5353
import java.time.Duration;
5454
import java.util.Map;
55+
import java.util.concurrent.ScheduledExecutorService;
5556

5657
import static java.util.Collections.emptyMap;
5758

@@ -75,10 +76,20 @@ class S3AsyncService implements Closeable {
7576
*/
7677
private volatile Map<Settings, S3ClientSettings> derivedClientSettings = emptyMap();
7778

78-
S3AsyncService(final Path configPath) {
79+
/**
80+
* Optional scheduled executor service to use for the client
81+
*/
82+
private final @Nullable ScheduledExecutorService clientExecutorService;
83+
84+
S3AsyncService(final Path configPath, @Nullable ScheduledExecutorService clientExecutorService) {
7985
staticClientSettings = MapBuilder.<String, S3ClientSettings>newMapBuilder()
8086
.put("default", S3ClientSettings.getClientSettings(Settings.EMPTY, "default", configPath))
8187
.immutableMap();
88+
this.clientExecutorService = clientExecutorService;
89+
}
90+
91+
S3AsyncService(final Path configPath) {
92+
this(configPath, null);
8293
}
8394

8495
/**
@@ -173,7 +184,7 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
173184
) {
174185
setDefaultAwsProfilePath();
175186
final S3AsyncClientBuilder builder = S3AsyncClient.builder();
176-
builder.overrideConfiguration(buildOverrideConfiguration(clientSettings));
187+
builder.overrideConfiguration(buildOverrideConfiguration(clientSettings, clientExecutorService));
177188
final AwsCredentialsProvider credentials = buildCredentials(logger, clientSettings);
178189
builder.credentialsProvider(credentials);
179190

@@ -234,7 +245,10 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
234245
return AmazonAsyncS3WithCredentials.create(client, priorityClient, urgentClient, credentials);
235246
}
236247

237-
static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) {
248+
static ClientOverrideConfiguration buildOverrideConfiguration(
249+
final S3ClientSettings clientSettings,
250+
ScheduledExecutorService clientExecutorService
251+
) {
238252
RetryPolicy retryPolicy = SocketAccess.doPrivileged(
239253
() -> RetryPolicy.builder()
240254
.numRetries(clientSettings.maxRetries)
@@ -243,11 +257,12 @@ static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSett
243257
)
244258
.build()
245259
);
260+
ClientOverrideConfiguration.Builder builder = ClientOverrideConfiguration.builder();
261+
if (clientExecutorService != null) {
262+
builder = builder.scheduledExecutorService(clientExecutorService);
263+
}
246264

247-
return ClientOverrideConfiguration.builder()
248-
.retryPolicy(retryPolicy)
249-
.apiCallAttemptTimeout(Duration.ofMillis(clientSettings.requestTimeoutMillis))
250-
.build();
265+
return builder.retryPolicy(retryPolicy).apiCallAttemptTimeout(Duration.ofMillis(clientSettings.requestTimeoutMillis)).build();
251266
}
252267

253268
// pkg private for tests

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import java.time.Duration;
9393
import java.util.Map;
9494
import java.util.concurrent.ConcurrentHashMap;
95+
import java.util.concurrent.ScheduledExecutorService;
9596

9697
import static java.util.Collections.emptyMap;
9798

@@ -115,10 +116,20 @@ class S3Service implements Closeable {
115116
*/
116117
private volatile Map<Settings, S3ClientSettings> derivedClientSettings = new ConcurrentHashMap<>();
117118

118-
S3Service(final Path configPath) {
119+
/**
120+
* Optional scheduled executor service to use for the client
121+
*/
122+
private final @Nullable ScheduledExecutorService clientExecutorService;
123+
124+
S3Service(final Path configPath, @Nullable ScheduledExecutorService clientExecutorService) {
119125
staticClientSettings = MapBuilder.<String, S3ClientSettings>newMapBuilder()
120126
.put("default", S3ClientSettings.getClientSettings(Settings.EMPTY, "default", configPath))
121127
.immutableMap();
128+
this.clientExecutorService = clientExecutorService;
129+
}
130+
131+
S3Service(final Path configPath) {
132+
this(configPath, null);
122133
}
123134

124135
/**
@@ -204,7 +215,7 @@ AmazonS3WithCredentials buildClient(final S3ClientSettings clientSettings) {
204215
final AwsCredentialsProvider credentials = buildCredentials(logger, clientSettings);
205216
builder.credentialsProvider(credentials);
206217
builder.httpClientBuilder(buildHttpClient(clientSettings));
207-
builder.overrideConfiguration(buildOverrideConfiguration(clientSettings));
218+
builder.overrideConfiguration(buildOverrideConfiguration(clientSettings, clientExecutorService));
208219

209220
String endpoint = Strings.hasLength(clientSettings.endpoint) ? clientSettings.endpoint : DEFAULT_S3_ENDPOINT;
210221
if ((endpoint.startsWith("http://") || endpoint.startsWith("https://")) == false) {
@@ -318,8 +329,14 @@ static ProxyConfiguration buildHttpProxyConfiguration(S3ClientSettings clientSet
318329
return proxyConfiguration.build();
319330
}
320331

321-
static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) {
332+
static ClientOverrideConfiguration buildOverrideConfiguration(
333+
final S3ClientSettings clientSettings,
334+
@Nullable ScheduledExecutorService clientExecutorService
335+
) {
322336
ClientOverrideConfiguration.Builder clientOverrideConfiguration = ClientOverrideConfiguration.builder();
337+
if (clientExecutorService != null) {
338+
clientOverrideConfiguration = clientOverrideConfiguration.scheduledExecutorService(clientExecutorService);
339+
}
323340
if (Strings.hasLength(clientSettings.signerOverride)) {
324341
clientOverrideConfiguration = clientOverrideConfiguration.putAdvancedOption(
325342
SdkAdvancedClientOption.SIGNER,

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ private void launchAWSConfigurationTest(
358358
assertThat(proxyConfiguration.password(), is(expectedProxyPassword));
359359
}
360360

361-
final ClientOverrideConfiguration clientOverrideConfiguration = S3Service.buildOverrideConfiguration(clientSettings);
361+
final ClientOverrideConfiguration clientOverrideConfiguration = S3Service.buildOverrideConfiguration(clientSettings, null);
362362

363363
assertTrue(clientOverrideConfiguration.retryPolicy().isPresent());
364364
assertThat(clientOverrideConfiguration.retryPolicy().get().numRetries(), is(expectedMaxRetries));

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,15 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
130130
@Before
131131
public void setUp() throws Exception {
132132
previousOpenSearchPathConf = SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", configPath().toString()));
133-
service = new S3Service(configPath());
134-
asyncService = new S3AsyncService(configPath());
133+
scheduler = new ScheduledThreadPoolExecutor(1);
134+
service = new S3Service(configPath(), scheduler);
135+
asyncService = new S3AsyncService(configPath(), scheduler);
135136

136137
futureCompletionService = Executors.newSingleThreadExecutor();
137138
streamReaderService = Executors.newSingleThreadExecutor();
138139
transferNIOGroup = new AsyncTransferEventLoopGroup(1);
139140
remoteTransferRetry = Executors.newFixedThreadPool(20);
140141
transferQueueConsumerService = Executors.newFixedThreadPool(2);
141-
scheduler = new ScheduledThreadPoolExecutor(1);
142142
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
143143
normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
144144
new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB),

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ClientSettingsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,13 +314,13 @@ public void testSignerOverrideCanBeSet() {
314314
assertThat(settings.get("other").signerOverride, is(signerOverride));
315315

316316
ClientOverrideConfiguration defaultConfiguration = SocketAccess.doPrivileged(
317-
() -> S3Service.buildOverrideConfiguration(settings.get("default"))
317+
() -> S3Service.buildOverrideConfiguration(settings.get("default"), null)
318318
);
319319
Optional<Signer> defaultSigner = defaultConfiguration.advancedOption(SdkAdvancedClientOption.SIGNER);
320320
assertFalse(defaultSigner.isPresent());
321321

322322
ClientOverrideConfiguration configuration = SocketAccess.doPrivileged(
323-
() -> S3Service.buildOverrideConfiguration(settings.get("other"))
323+
() -> S3Service.buildOverrideConfiguration(settings.get("other"), null)
324324
);
325325
Optional<Signer> otherSigner = configuration.advancedOption(SdkAdvancedClientOption.SIGNER);
326326
assertTrue(otherSigner.isPresent());

0 commit comments

Comments
 (0)