Skip to content

Commit 59aa51b

Browse files
committed
Create separate listener to track local segments file after refresh
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 6cc8da5 commit 59aa51b

11 files changed

+236
-141
lines changed

server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
package org.opensearch.index.remote;
1010

11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
13+
import org.opensearch.common.CheckedFunction;
14+
import org.opensearch.common.logging.Loggers;
1115
import org.opensearch.core.common.io.stream.StreamInput;
1216
import org.opensearch.core.common.io.stream.StreamOutput;
1317
import org.opensearch.core.common.io.stream.Writeable;
@@ -17,21 +21,26 @@
1721
import org.opensearch.core.index.shard.ShardId;
1822

1923
import java.io.IOException;
20-
import java.util.HashMap;
24+
import java.util.Collection;
25+
import java.util.Collections;
2126
import java.util.HashSet;
2227
import java.util.Map;
2328
import java.util.Set;
2429
import java.util.concurrent.atomic.AtomicLong;
2530
import java.util.concurrent.atomic.AtomicReference;
2631
import java.util.stream.Collectors;
2732

33+
import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES;
34+
2835
/**
2936
* Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics.
3037
*
3138
* @opensearch.internal
3239
*/
3340
public class RemoteRefreshSegmentTracker {
3441

42+
private final Logger logger;
43+
3544
/**
3645
* ShardId for which this instance tracks the remote segment upload metadata.
3746
*/
@@ -123,14 +132,14 @@ public class RemoteRefreshSegmentTracker {
123132
private final Map<String, AtomicLong> rejectionCountMap = ConcurrentCollections.newConcurrentMap();
124133

125134
/**
126-
* Map of name to size of the segment files created as part of the most recent refresh.
135+
* Keeps track of segment files and their size in bytes which are part of the most recent refresh.
127136
*/
128-
private volatile Map<String, Long> latestLocalFileNameLengthMap;
137+
private final Map<String, Long> latestLocalFileNameLengthMap = ConcurrentCollections.newConcurrentMap();
129138

130139
/**
131140
* Set of names of segment files that were uploaded as part of the most recent remote refresh.
132141
*/
133-
private final Set<String> latestUploadedFiles = new HashSet<>();
142+
private final Set<String> latestUploadedFiles = ConcurrentCollections.newConcurrentSet();
134143

135144
/**
136145
* Keeps the bytes lag computed so that we do not compute it for every request.
@@ -175,6 +184,7 @@ public RemoteRefreshSegmentTracker(
175184
int uploadBytesPerSecMovingAverageWindowSize,
176185
int uploadTimeMsMovingAverageWindowSize
177186
) {
187+
logger = Loggers.getLogger(getClass(), shardId);
178188
this.shardId = shardId;
179189
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
180190
long currentClockTimeMs = System.currentTimeMillis();
@@ -186,8 +196,6 @@ public RemoteRefreshSegmentTracker(
186196
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
187197
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
188198
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));
189-
190-
latestLocalFileNameLengthMap = new HashMap<>();
191199
}
192200

193201
ShardId getShardId() {
@@ -361,12 +369,36 @@ long getRejectionCount(String rejectionReason) {
361369
return rejectionCountMap.get(rejectionReason).get();
362370
}
363371

364-
Map<String, Long> getLatestLocalFileNameLengthMap() {
365-
return latestLocalFileNameLengthMap;
372+
public Map<String, Long> getLatestLocalFileNameLengthMap() {
373+
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
366374
}
367375

368-
public void setLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
369-
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
376+
/**
377+
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
378+
*
379+
* @param segmentFiles list of local refreshed segment files
380+
* @param fileSizeFunction function is used to determine the file size in bytes
381+
*/
382+
public void updateLatestLocalFileNameLengthMap(
383+
Collection<String> segmentFiles,
384+
CheckedFunction<String, Long, IOException> fileSizeFunction
385+
) {
386+
// Update the map
387+
segmentFiles.stream()
388+
.filter(file -> EXCLUDE_FILES.contains(file) == false)
389+
.filter(file -> latestLocalFileNameLengthMap.containsKey(file) == false || latestLocalFileNameLengthMap.get(file) == 0)
390+
.forEach(file -> {
391+
long fileSize = 0;
392+
try {
393+
fileSize = fileSizeFunction.apply(file);
394+
} catch (IOException e) {
395+
logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e);
396+
}
397+
latestLocalFileNameLengthMap.put(file, fileSize);
398+
});
399+
Set<String> fileSet = new HashSet<>(segmentFiles);
400+
// Remove keys from the fileSizeMap that do not exist in the latest segment files
401+
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
370402
computeBytesLag();
371403
}
372404

@@ -382,7 +414,7 @@ public void setLatestUploadedFiles(Set<String> files) {
382414
}
383415

384416
private void computeBytesLag() {
385-
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
417+
if (latestLocalFileNameLengthMap.isEmpty()) {
386418
return;
387419
}
388420
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()

server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void beforeRefresh() throws IOException {
3939
}
4040

4141
@Override
42-
protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) {
42+
protected boolean performAfterRefresh(boolean didRefresh) {
4343
if (didRefresh
4444
&& shard.state() == IndexShardState.STARTED
4545
&& shard.getReplicationTracker().isPrimaryMode()

server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.io.IOException;
1818
import java.util.concurrent.Semaphore;
1919
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.TimeoutException;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
/**
@@ -30,7 +31,9 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceMana
3031
* Total permits = 1 ensures that there is only single instance of performAfterRefresh that is running at a time.
3132
* In case there are use cases where concurrency is required, the total permit variable can be put inside the ctor.
3233
*/
33-
private static final int TOTAL_PERMITS = 1;
34+
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
35+
36+
private final AtomicBoolean closed = new AtomicBoolean(false);
3437

3538
private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS);
3639

@@ -51,16 +54,10 @@ public CloseableRetryableRefreshListener(ThreadPool threadPool) {
5154

5255
@Override
5356
public final void afterRefresh(boolean didRefresh) throws IOException {
54-
boolean successful;
55-
boolean permitAcquired = semaphore.tryAcquire();
56-
try {
57-
successful = permitAcquired && performAfterRefresh(didRefresh, false);
58-
} finally {
59-
if (permitAcquired) {
60-
semaphore.release();
61-
}
57+
if (closed.get()) {
58+
return;
6259
}
63-
scheduleRetry(successful, didRefresh, permitAcquired);
60+
run(didRefresh, () -> {});
6461
}
6562

6663
protected String getRetryThreadPoolName() {
@@ -71,7 +68,12 @@ protected TimeValue getNextRetryInterval() {
7168
return null;
7269
}
7370

74-
private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boolean didRefresh, boolean isRetry) {
71+
private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boolean didRefresh) {
72+
// If the underlying listener has closed, then we do not allow even the retry to be scheduled
73+
if (closed.get()) {
74+
return;
75+
}
76+
7577
if (this.threadPool == null
7678
|| interval == null
7779
|| retryThreadPoolName == null
@@ -80,62 +82,65 @@ private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boole
8082
|| retryScheduled.compareAndSet(false, true) == false) {
8183
return;
8284
}
85+
8386
boolean scheduled = false;
8487
try {
85-
this.threadPool.schedule(() -> {
86-
boolean successful;
87-
boolean permitAcquired = semaphore.tryAcquire();
88-
try {
89-
successful = permitAcquired && performAfterRefresh(didRefresh, isRetry);
90-
} finally {
91-
if (permitAcquired) {
92-
semaphore.release();
93-
}
94-
retryScheduled.set(false);
95-
}
96-
scheduleRetry(successful, didRefresh, isRetry || permitAcquired);
97-
}, interval, retryThreadPoolName);
88+
this.threadPool.schedule(() -> run(didRefresh, () -> retryScheduled.set(false)), interval, retryThreadPoolName);
9889
scheduled = true;
99-
getLogger().info("Scheduled retry with didRefresh={} isRetry={}", didRefresh, isRetry);
90+
getLogger().info("Scheduled retry with didRefresh={}", didRefresh);
10091
} finally {
10192
if (scheduled == false) {
10293
retryScheduled.set(false);
10394
}
10495
}
10596
}
10697

98+
private void run(boolean didRefresh, Runnable runFinally) {
99+
boolean successful;
100+
boolean permitAcquired = semaphore.tryAcquire();
101+
try {
102+
successful = permitAcquired && performAfterRefresh(didRefresh);
103+
} finally {
104+
if (permitAcquired) {
105+
semaphore.release();
106+
}
107+
runFinally.run();
108+
}
109+
assert permitAcquired;
110+
scheduleRetry(successful, didRefresh);
111+
}
112+
107113
/**
108114
* Schedules the retry based on the {@code afterRefreshSuccessful} value.
109115
*
110116
* @param afterRefreshSuccessful is sent true if the performAfterRefresh(..) is successful.
111117
* @param didRefresh if the refresh did open a new reference then didRefresh will be true
112-
* @param isRetry if this is a failure or permit was not acquired.
113118
*/
114-
private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh, boolean isRetry) {
119+
private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) {
115120
if (afterRefreshSuccessful == false) {
116-
scheduleRetry(getNextRetryInterval(), getRetryThreadPoolName(), didRefresh, isRetry);
121+
scheduleRetry(getNextRetryInterval(), getRetryThreadPoolName(), didRefresh);
117122
}
118123
}
119124

120125
/**
121126
* This method needs to be overridden and be provided with what needs to be run on after refresh.
122127
*
123128
* @param didRefresh true if the refresh opened a new reference
124-
* @param isRetry true if this is a retry attempt
125129
* @return true if a retry is needed else false.
126130
*/
127-
protected abstract boolean performAfterRefresh(boolean didRefresh, boolean isRetry);
131+
protected abstract boolean performAfterRefresh(boolean didRefresh);
128132

129133
@Override
130134
public final void close() throws IOException {
131135
try {
132136
if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) {
133-
assert semaphore.availablePermits() == 0;
137+
boolean result = closed.compareAndSet(false, true);
138+
assert result && semaphore.availablePermits() == 0;
134139
} else {
135-
throw new RuntimeException("timeout while closing gated refresh listener");
140+
throw new TimeoutException("timeout while closing gated refresh listener");
136141
}
137-
} catch (InterruptedException e) {
138-
throw new RuntimeException(e);
142+
} catch (InterruptedException | TimeoutException e) {
143+
throw new RuntimeException("Failed to close the closeable retryable listener", e);
139144
}
140145
}
141146

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3675,6 +3675,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
36753675
}
36763676

36773677
if (isRemoteStoreEnabled()) {
3678+
internalRefreshListener.add(
3679+
new RemoteSegmentTrackerListener(this, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()))
3680+
);
36783681
internalRefreshListener.add(
36793682
new RemoteStoreRefreshListener(
36803683
this,
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.index.SegmentInfos;
13+
import org.apache.lucene.search.ReferenceManager;
14+
import org.apache.lucene.store.Directory;
15+
import org.apache.lucene.store.FilterDirectory;
16+
import org.opensearch.common.concurrent.GatedCloseable;
17+
import org.opensearch.common.logging.Loggers;
18+
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
19+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
20+
21+
import java.io.IOException;
22+
import java.util.Collection;
23+
24+
/**
25+
* This listener updates the remote segment tracker with the segment files of the most recent refresh. This is helpful in
26+
* determining the lag and hence applying rejection on lagging remote uploads.
27+
*
28+
* @opensearch.internal
29+
*/
30+
public class RemoteSegmentTrackerListener implements ReferenceManager.RefreshListener {
31+
32+
private final Logger logger;
33+
private final IndexShard indexShard;
34+
private final RemoteRefreshSegmentTracker segmentTracker;
35+
private final RemoteSegmentStoreDirectory remoteDirectory;
36+
private final Directory storeDirectory;
37+
private long primaryTerm;
38+
39+
public RemoteSegmentTrackerListener(IndexShard indexShard, RemoteRefreshSegmentTracker segmentTracker) {
40+
this.indexShard = indexShard;
41+
this.segmentTracker = segmentTracker;
42+
logger = Loggers.getLogger(getClass(), indexShard.shardId());
43+
storeDirectory = indexShard.store().directory();
44+
remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
45+
.getDelegate()).getDelegate();
46+
if (indexShard.routingEntry().primary()) {
47+
try {
48+
this.remoteDirectory.init();
49+
} catch (IOException e) {
50+
logger.error("Exception while initialising RemoteSegmentStoreDirectory", e);
51+
}
52+
}
53+
this.primaryTerm = remoteDirectory.getPrimaryTermAtInit();
54+
}
55+
56+
@Override
57+
public void beforeRefresh() throws IOException {}
58+
59+
@Override
60+
public void afterRefresh(boolean didRefresh) throws IOException {
61+
if (didRefresh
62+
|| this.primaryTerm != indexShard.getOperationPrimaryTerm()
63+
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
64+
updateLocalRefreshTimeAndSeqNo();
65+
try {
66+
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
67+
this.primaryTerm = indexShard.getOperationPrimaryTerm();
68+
}
69+
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
70+
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
71+
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
72+
}
73+
} catch (Throwable t) {
74+
logger.error("Exception in RemoteSegmentTrackerListener.afterRefresh()", t);
75+
}
76+
}
77+
}
78+
79+
/**
80+
* Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size.
81+
*
82+
* @param segmentFiles list of segment files that are part of the most recent local refresh.
83+
*/
84+
private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
85+
segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
86+
}
87+
88+
/**
89+
* Updates the last refresh time and refresh seq no which is seen by local store.
90+
*/
91+
private void updateLocalRefreshTimeAndSeqNo() {
92+
segmentTracker.updateLocalRefreshClockTimeMs(System.currentTimeMillis());
93+
segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
94+
segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1);
95+
}
96+
}

0 commit comments

Comments
 (0)