Skip to content

Commit 11493a5

Browse files
committed
Create separate listener to track local segments file after refresh
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 230a06d commit 11493a5

File tree

8 files changed

+190
-94
lines changed

8 files changed

+190
-94
lines changed

server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
package org.opensearch.index.remote;
1010

11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
13+
import org.opensearch.common.CheckedFunction;
14+
import org.opensearch.common.logging.Loggers;
1115
import org.opensearch.core.common.io.stream.StreamInput;
1216
import org.opensearch.core.common.io.stream.StreamOutput;
1317
import org.opensearch.core.common.io.stream.Writeable;
@@ -17,21 +21,25 @@
1721
import org.opensearch.core.index.shard.ShardId;
1822

1923
import java.io.IOException;
20-
import java.util.HashMap;
24+
import java.util.Collection;
2125
import java.util.HashSet;
2226
import java.util.Map;
2327
import java.util.Set;
2428
import java.util.concurrent.atomic.AtomicLong;
2529
import java.util.concurrent.atomic.AtomicReference;
2630
import java.util.stream.Collectors;
2731

32+
import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES;
33+
2834
/**
2935
* Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics.
3036
*
3137
* @opensearch.internal
3238
*/
3339
public class RemoteRefreshSegmentTracker {
3440

41+
private final Logger logger;
42+
3543
/**
3644
* ShardId for which this instance tracks the remote segment upload metadata.
3745
*/
@@ -123,14 +131,14 @@ public class RemoteRefreshSegmentTracker {
123131
private final Map<String, AtomicLong> rejectionCountMap = ConcurrentCollections.newConcurrentMap();
124132

125133
/**
126-
* Map of name to size of the segment files created as part of the most recent refresh.
134+
* Keeps track of segment files and their size in bytes which are part of the most recent refresh.
127135
*/
128-
private volatile Map<String, Long> latestLocalFileNameLengthMap;
136+
private final Map<String, Long> latestLocalFileNameLengthMap = ConcurrentCollections.newConcurrentMap();
129137

130138
/**
131139
* Set of names of segment files that were uploaded as part of the most recent remote refresh.
132140
*/
133-
private final Set<String> latestUploadedFiles = new HashSet<>();
141+
private final Set<String> latestUploadedFiles = ConcurrentCollections.newConcurrentSet();
134142

135143
/**
136144
* Keeps the bytes lag computed so that we do not compute it for every request.
@@ -175,6 +183,7 @@ public RemoteRefreshSegmentTracker(
175183
int uploadBytesPerSecMovingAverageWindowSize,
176184
int uploadTimeMsMovingAverageWindowSize
177185
) {
186+
logger = Loggers.getLogger(getClass(), shardId);
178187
this.shardId = shardId;
179188
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
180189
long currentClockTimeMs = System.currentTimeMillis();
@@ -186,8 +195,6 @@ public RemoteRefreshSegmentTracker(
186195
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
187196
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
188197
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));
189-
190-
latestLocalFileNameLengthMap = new HashMap<>();
191198
}
192199

193200
ShardId getShardId() {
@@ -361,12 +368,43 @@ long getRejectionCount(String rejectionReason) {
361368
return rejectionCountMap.get(rejectionReason).get();
362369
}
363370

364-
Map<String, Long> getLatestLocalFileNameLengthMap() {
371+
public Map<String, Long> getLatestLocalFileNameLengthMap() {
365372
return latestLocalFileNameLengthMap;
366373
}
367374

368-
public void setLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
369-
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
375+
/**
376+
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
377+
*
378+
* @param segmentFiles list of local refreshed segment files
379+
* @param fileSizeFunction function is used to determine the file size in bytes
380+
*/
381+
382+
/**
383+
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
384+
*
385+
* @param segmentFiles list of local refreshed segment files
386+
* @param fileSizeFunction function is used to determine the file size in bytes
387+
*/
388+
public void updateLatestLocalFileNameLengthMap(
389+
Collection<String> segmentFiles,
390+
CheckedFunction<String, Long, IOException> fileSizeFunction
391+
) {
392+
// Update the map
393+
segmentFiles.stream()
394+
.filter(file -> EXCLUDE_FILES.contains(file) == false)
395+
.filter(file -> latestLocalFileNameLengthMap.containsKey(file) == false || latestLocalFileNameLengthMap.get(file) == 0)
396+
.forEach(file -> {
397+
long fileSize = 0;
398+
try {
399+
fileSize = fileSizeFunction.apply(file);
400+
} catch (IOException e) {
401+
logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e);
402+
}
403+
latestLocalFileNameLengthMap.put(file, fileSize);
404+
});
405+
Set<String> fileSet = new HashSet<>(segmentFiles);
406+
// Remove keys from the fileSizeMap that do not exist in the latest segment files
407+
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
370408
computeBytesLag();
371409
}
372410

@@ -382,7 +420,7 @@ public void setLatestUploadedFiles(Set<String> files) {
382420
}
383421

384422
private void computeBytesLag() {
385-
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
423+
if (latestLocalFileNameLengthMap.isEmpty()) {
386424
return;
387425
}
388426
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3675,6 +3675,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
36753675
}
36763676

36773677
if (isRemoteStoreEnabled()) {
3678+
internalRefreshListener.add(
3679+
new RemoteSegmentTrackerListener(this, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()))
3680+
);
36783681
internalRefreshListener.add(
36793682
new RemoteStoreRefreshListener(
36803683
this,
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.index.SegmentInfos;
13+
import org.apache.lucene.search.ReferenceManager;
14+
import org.apache.lucene.store.Directory;
15+
import org.apache.lucene.store.FilterDirectory;
16+
import org.opensearch.common.concurrent.GatedCloseable;
17+
import org.opensearch.common.logging.Loggers;
18+
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
19+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
20+
21+
import java.io.IOException;
22+
import java.util.Collection;
23+
24+
/**
25+
* This listener updates the remote segment tracker with the segment files of the most recent refresh. This is helpful in
26+
* determining the lag and hence applying rejection on lagging remote uploads.
27+
*
28+
* @opensearch.internal
29+
*/
30+
public class RemoteSegmentTrackerListener implements ReferenceManager.RefreshListener {
31+
32+
private final Logger logger;
33+
private final IndexShard indexShard;
34+
private final RemoteRefreshSegmentTracker segmentTracker;
35+
private final RemoteSegmentStoreDirectory remoteDirectory;
36+
private final Directory storeDirectory;
37+
private long primaryTerm;
38+
39+
public RemoteSegmentTrackerListener(IndexShard indexShard, RemoteRefreshSegmentTracker segmentTracker) {
40+
this.indexShard = indexShard;
41+
this.segmentTracker = segmentTracker;
42+
logger = Loggers.getLogger(getClass(), indexShard.shardId());
43+
storeDirectory = indexShard.store().directory();
44+
remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
45+
.getDelegate()).getDelegate();
46+
if (indexShard.routingEntry().primary()) {
47+
try {
48+
this.remoteDirectory.init();
49+
} catch (IOException e) {
50+
logger.error("Exception while initialising RemoteSegmentStoreDirectory", e);
51+
}
52+
}
53+
this.primaryTerm = remoteDirectory.getPrimaryTermAtInit();
54+
}
55+
56+
@Override
57+
public void beforeRefresh() throws IOException {}
58+
59+
@Override
60+
public void afterRefresh(boolean didRefresh) throws IOException {
61+
if (didRefresh
62+
|| this.primaryTerm != indexShard.getOperationPrimaryTerm()
63+
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
64+
updateLocalRefreshTimeAndSeqNo();
65+
try {
66+
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
67+
this.primaryTerm = indexShard.getOperationPrimaryTerm();
68+
}
69+
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
70+
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
71+
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
72+
}
73+
} catch (Throwable t) {
74+
logger.error("Exception in RemoteSegmentTrackerListener.afterRefresh()", t);
75+
}
76+
}
77+
}
78+
79+
/**
80+
* Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size.
81+
*
82+
* @param segmentFiles list of segment files that are part of the most recent local refresh.
83+
*/
84+
private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
85+
segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
86+
}
87+
88+
/**
89+
* Updates the last refresh time and refresh seq no which is seen by local store.
90+
*/
91+
private void updateLocalRefreshTimeAndSeqNo() {
92+
segmentTracker.updateLocalRefreshClockTimeMs(System.currentTimeMillis());
93+
segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
94+
segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1);
95+
}
96+
}

0 commit comments

Comments
 (0)