Skip to content

Commit 3910aab

Browse files
committed
Create separate listener to track local segments file after refresh
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 6cc8da5 commit 3910aab

File tree

8 files changed

+185
-95
lines changed

8 files changed

+185
-95
lines changed

server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
package org.opensearch.index.remote;
1010

11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
13+
import org.opensearch.common.CheckedFunction;
14+
import org.opensearch.common.logging.Loggers;
1115
import org.opensearch.core.common.io.stream.StreamInput;
1216
import org.opensearch.core.common.io.stream.StreamOutput;
1317
import org.opensearch.core.common.io.stream.Writeable;
@@ -17,21 +21,26 @@
1721
import org.opensearch.core.index.shard.ShardId;
1822

1923
import java.io.IOException;
20-
import java.util.HashMap;
24+
import java.util.Collection;
25+
import java.util.Collections;
2126
import java.util.HashSet;
2227
import java.util.Map;
2328
import java.util.Set;
2429
import java.util.concurrent.atomic.AtomicLong;
2530
import java.util.concurrent.atomic.AtomicReference;
2631
import java.util.stream.Collectors;
2732

33+
import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES;
34+
2835
/**
2936
* Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics.
3037
*
3138
* @opensearch.internal
3239
*/
3340
public class RemoteRefreshSegmentTracker {
3441

42+
private final Logger logger;
43+
3544
/**
3645
* ShardId for which this instance tracks the remote segment upload metadata.
3746
*/
@@ -123,14 +132,14 @@ public class RemoteRefreshSegmentTracker {
123132
private final Map<String, AtomicLong> rejectionCountMap = ConcurrentCollections.newConcurrentMap();
124133

125134
/**
126-
* Map of name to size of the segment files created as part of the most recent refresh.
135+
* Keeps track of segment files and their size in bytes which are part of the most recent refresh.
127136
*/
128-
private volatile Map<String, Long> latestLocalFileNameLengthMap;
137+
private final Map<String, Long> latestLocalFileNameLengthMap = ConcurrentCollections.newConcurrentMap();
129138

130139
/**
131140
* Set of names of segment files that were uploaded as part of the most recent remote refresh.
132141
*/
133-
private final Set<String> latestUploadedFiles = new HashSet<>();
142+
private final Set<String> latestUploadedFiles = ConcurrentCollections.newConcurrentSet();
134143

135144
/**
136145
* Keeps the bytes lag computed so that we do not compute it for every request.
@@ -175,6 +184,7 @@ public RemoteRefreshSegmentTracker(
175184
int uploadBytesPerSecMovingAverageWindowSize,
176185
int uploadTimeMsMovingAverageWindowSize
177186
) {
187+
logger = Loggers.getLogger(getClass(), shardId);
178188
this.shardId = shardId;
179189
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
180190
long currentClockTimeMs = System.currentTimeMillis();
@@ -186,8 +196,6 @@ public RemoteRefreshSegmentTracker(
186196
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
187197
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
188198
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));
189-
190-
latestLocalFileNameLengthMap = new HashMap<>();
191199
}
192200

193201
ShardId getShardId() {
@@ -361,12 +369,36 @@ long getRejectionCount(String rejectionReason) {
361369
return rejectionCountMap.get(rejectionReason).get();
362370
}
363371

364-
Map<String, Long> getLatestLocalFileNameLengthMap() {
365-
return latestLocalFileNameLengthMap;
372+
public Map<String, Long> getLatestLocalFileNameLengthMap() {
373+
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
366374
}
367375

368-
public void setLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
369-
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
376+
/**
377+
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
378+
*
379+
* @param segmentFiles list of local refreshed segment files
380+
* @param fileSizeFunction function is used to determine the file size in bytes
381+
*/
382+
public void updateLatestLocalFileNameLengthMap(
383+
Collection<String> segmentFiles,
384+
CheckedFunction<String, Long, IOException> fileSizeFunction
385+
) {
386+
// Update the map
387+
segmentFiles.stream()
388+
.filter(file -> EXCLUDE_FILES.contains(file) == false)
389+
.filter(file -> latestLocalFileNameLengthMap.containsKey(file) == false || latestLocalFileNameLengthMap.get(file) == 0)
390+
.forEach(file -> {
391+
long fileSize = 0;
392+
try {
393+
fileSize = fileSizeFunction.apply(file);
394+
} catch (IOException e) {
395+
logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e);
396+
}
397+
latestLocalFileNameLengthMap.put(file, fileSize);
398+
});
399+
Set<String> fileSet = new HashSet<>(segmentFiles);
400+
// Remove keys from the fileSizeMap that do not exist in the latest segment files
401+
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
370402
computeBytesLag();
371403
}
372404

@@ -382,7 +414,7 @@ public void setLatestUploadedFiles(Set<String> files) {
382414
}
383415

384416
private void computeBytesLag() {
385-
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
417+
if (latestLocalFileNameLengthMap.isEmpty()) {
386418
return;
387419
}
388420
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3675,6 +3675,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
36753675
}
36763676

36773677
if (isRemoteStoreEnabled()) {
3678+
internalRefreshListener.add(
3679+
new RemoteSegmentTrackerListener(this, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()))
3680+
);
36783681
internalRefreshListener.add(
36793682
new RemoteStoreRefreshListener(
36803683
this,
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.index.SegmentInfos;
13+
import org.apache.lucene.search.ReferenceManager;
14+
import org.apache.lucene.store.Directory;
15+
import org.apache.lucene.store.FilterDirectory;
16+
import org.opensearch.common.concurrent.GatedCloseable;
17+
import org.opensearch.common.logging.Loggers;
18+
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
19+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
20+
21+
import java.io.IOException;
22+
import java.util.Collection;
23+
24+
/**
25+
* This listener updates the remote segment tracker with the segment files of the most recent refresh. This is helpful in
26+
* determining the lag and hence applying rejection on lagging remote uploads.
27+
*
28+
* @opensearch.internal
29+
*/
30+
public class RemoteSegmentTrackerListener implements ReferenceManager.RefreshListener {
31+
32+
private final Logger logger;
33+
private final IndexShard indexShard;
34+
private final RemoteRefreshSegmentTracker segmentTracker;
35+
private final RemoteSegmentStoreDirectory remoteDirectory;
36+
private final Directory storeDirectory;
37+
private long primaryTerm;
38+
39+
public RemoteSegmentTrackerListener(IndexShard indexShard, RemoteRefreshSegmentTracker segmentTracker) {
40+
this.indexShard = indexShard;
41+
this.segmentTracker = segmentTracker;
42+
logger = Loggers.getLogger(getClass(), indexShard.shardId());
43+
storeDirectory = indexShard.store().directory();
44+
remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
45+
.getDelegate()).getDelegate();
46+
if (indexShard.routingEntry().primary()) {
47+
try {
48+
this.remoteDirectory.init();
49+
} catch (IOException e) {
50+
logger.error("Exception while initialising RemoteSegmentStoreDirectory", e);
51+
}
52+
}
53+
this.primaryTerm = remoteDirectory.getPrimaryTermAtInit();
54+
}
55+
56+
@Override
57+
public void beforeRefresh() throws IOException {}
58+
59+
@Override
60+
public void afterRefresh(boolean didRefresh) throws IOException {
61+
if (didRefresh
62+
|| this.primaryTerm != indexShard.getOperationPrimaryTerm()
63+
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
64+
updateLocalRefreshTimeAndSeqNo();
65+
try {
66+
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
67+
this.primaryTerm = indexShard.getOperationPrimaryTerm();
68+
}
69+
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
70+
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
71+
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
72+
}
73+
} catch (Throwable t) {
74+
logger.error("Exception in RemoteSegmentTrackerListener.afterRefresh()", t);
75+
}
76+
}
77+
}
78+
79+
/**
80+
* Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size.
81+
*
82+
* @param segmentFiles list of segment files that are part of the most recent local refresh.
83+
*/
84+
private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
85+
segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
86+
}
87+
88+
/**
89+
* Updates the last refresh time and refresh seq no which is seen by local store.
90+
*/
91+
private void updateLocalRefreshTimeAndSeqNo() {
92+
segmentTracker.updateLocalRefreshClockTimeMs(System.currentTimeMillis());
93+
segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
94+
segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1);
95+
}
96+
}

0 commit comments

Comments
 (0)