Skip to content

Commit 1e3e35d

Browse files
authored
Change Remote state read thread pool to Fixed type (#16850) (#16879)
* Change Remote state read thread pool to Fixed type Signed-off-by: Sooraj Sinha <soosinha@amazon.com> (cherry picked from commit 8aa3185)
1 parent 444599c commit 1e3e35d

File tree

6 files changed

+33
-5
lines changed

6 files changed

+33
-5
lines changed

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,13 @@ private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionRes
430430

431431
if (remoteClusterStateService != null && termVersionResponse.isStatePresentInRemote()) {
432432
try {
433+
logger.info(
434+
() -> new ParameterizedMessage(
435+
"Term version checker downloading full cluster state for term {}, version {}",
436+
termVersion.getTerm(),
437+
termVersion.getVersion()
438+
)
439+
);
433440
ClusterStateTermVersion clusterStateTermVersion = termVersionResponse.getClusterStateTermVersion();
434441
Optional<ClusterMetadataManifest> clusterMetadataManifest = remoteClusterStateService
435442
.getClusterMetadataManifestByTermVersion(
@@ -454,7 +461,7 @@ private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionRes
454461
return clusterStateFromRemote;
455462
}
456463
} catch (Exception e) {
457-
logger.trace("Error while fetching from remote cluster state", e);
464+
logger.error("Error while fetching from remote cluster state", e);
458465
}
459466
}
460467
return null;

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
258258
}
259259

260260
if (applyFullState == true) {
261-
logger.debug(
261+
logger.info(
262262
() -> new ParameterizedMessage(
263263
"Downloading full cluster state for term {}, version {}, stateUUID {}",
264264
manifest.getClusterTerm(),

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,8 +1473,22 @@ public ClusterState getClusterStateForManifest(
14731473
try {
14741474
ClusterState stateFromCache = remoteClusterStateCache.getState(clusterName, manifest);
14751475
if (stateFromCache != null) {
1476+
logger.trace(
1477+
() -> new ParameterizedMessage(
1478+
"Found cluster state in cache for term {} and version {}",
1479+
manifest.getClusterTerm(),
1480+
manifest.getStateVersion()
1481+
)
1482+
);
14761483
return stateFromCache;
14771484
}
1485+
logger.info(
1486+
() -> new ParameterizedMessage(
1487+
"Cluster state not found in cache for term {} and version {}",
1488+
manifest.getClusterTerm(),
1489+
manifest.getStateVersion()
1490+
)
1491+
);
14781492

14791493
final ClusterState clusterState;
14801494
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public static ThreadPoolType fromType(String type) {
198198
map.put(Names.REMOTE_REFRESH_RETRY, ThreadPoolType.SCALING);
199199
map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING);
200200
map.put(Names.INDEX_SEARCHER, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
201-
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING);
201+
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.FIXED);
202202
map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED);
203203
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
204204
}
@@ -317,7 +317,7 @@ public ThreadPool(
317317
);
318318
builders.put(
319319
Names.REMOTE_STATE_READ,
320-
new ScalingExecutorBuilder(Names.REMOTE_STATE_READ, 1, boundedBy(4 * allocatedProcessors, 4, 32), TimeValue.timeValueMinutes(5))
320+
new FixedExecutorBuilder(settings, Names.REMOTE_STATE_READ, boundedBy(4 * allocatedProcessors, 4, 32), 120000)
321321
);
322322
builders.put(
323323
Names.INDEX_SEARCHER,

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2354,6 +2354,14 @@ public void testReadLatestClusterStateFromCache() throws IOException {
23542354
.getState(clusterState.getClusterName().value(), expectedManifest);
23552355
assertEquals(stateFromCache.getMetadata(), state.getMetadata());
23562356

2357+
ClusterState stateFromCache2 = remoteClusterStateService.getClusterStateForManifest(
2358+
clusterState.getClusterName().value(),
2359+
expectedManifest,
2360+
"nodeA",
2361+
true
2362+
);
2363+
assertEquals(stateFromCache2.getMetadata(), state.getMetadata());
2364+
23572365
final ClusterMetadataManifest notExistMetadata = ClusterMetadataManifest.builder()
23582366
.indices(List.of())
23592367
.clusterTerm(1L)

server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
156156
sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessors);
157157
sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessors);
158158
sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors);
159-
sizes.put(ThreadPool.Names.REMOTE_STATE_READ, n -> ThreadPool.boundedBy(4 * n, 4, 32));
160159
return sizes.get(threadPoolName).apply(numberOfProcessors);
161160
}
162161

0 commit comments

Comments
 (0)