88
99package org .opensearch .index .remote ;
1010
11+ import org .apache .logging .log4j .Logger ;
12+ import org .apache .logging .log4j .message .ParameterizedMessage ;
13+ import org .opensearch .common .CheckedFunction ;
14+ import org .opensearch .common .logging .Loggers ;
1115import org .opensearch .core .common .io .stream .StreamInput ;
1216import org .opensearch .core .common .io .stream .StreamOutput ;
1317import org .opensearch .core .common .io .stream .Writeable ;
1822import org .opensearch .index .store .DirectoryFileTransferTracker ;
1923
2024import java .io .IOException ;
21- import java .util .HashMap ;
25+ import java .util .Collection ;
26+ import java .util .Collections ;
2227import java .util .HashSet ;
2328import java .util .Map ;
2429import java .util .Set ;
2530import java .util .concurrent .atomic .AtomicLong ;
2631import java .util .concurrent .atomic .AtomicReference ;
2732import java .util .stream .Collectors ;
2833
34+ import static org .opensearch .index .shard .RemoteStoreRefreshListener .EXCLUDE_FILES ;
35+
2936/**
3037 * Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics.
3138 *
3239 * @opensearch.internal
3340 */
3441public class RemoteSegmentTransferTracker {
3542
43+ private final Logger logger ;
44+
3645 /**
3746 * ShardId for which this instance tracks the remote segment upload metadata.
3847 */
@@ -124,14 +133,15 @@ public class RemoteSegmentTransferTracker {
124133 private final Map <String , AtomicLong > rejectionCountMap = ConcurrentCollections .newConcurrentMap ();
125134
126135 /**
127- * Map of name to size of the segment files created as part of the most recent refresh.
136+ * Keeps track of segment files and their size in bytes which are part of the most recent refresh.
128137 */
129- private volatile Map <String , Long > latestLocalFileNameLengthMap ;
138+ private final Map <String , Long > latestLocalFileNameLengthMap = ConcurrentCollections . newConcurrentMap () ;
130139
131140 /**
132- * Set of names of segment files that were uploaded as part of the most recent remote refresh.
141+ * This contains the files from the last successful remote refresh and ongoing uploads. This gets reset to just the
142+ * last successful remote refresh state on successful remote refresh.
133143 */
134- private final Set <String > latestUploadedFiles = new HashSet <> ();
144+ private final Set <String > latestUploadedFiles = ConcurrentCollections . newConcurrentSet ();
135145
136146 /**
137147 * Keeps the bytes lag computed so that we do not compute it for every request.
@@ -182,6 +192,7 @@ public RemoteSegmentTransferTracker(
182192 int uploadBytesPerSecMovingAverageWindowSize ,
183193 int uploadTimeMsMovingAverageWindowSize
184194 ) {
195+ logger = Loggers .getLogger (getClass (), shardId );
185196 this .shardId = shardId ;
186197 // Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
187198 long currentClockTimeMs = System .currentTimeMillis ();
@@ -193,8 +204,6 @@ public RemoteSegmentTransferTracker(
193204 uploadBytesMovingAverageReference = new AtomicReference <>(new MovingAverage (uploadBytesMovingAverageWindowSize ));
194205 uploadBytesPerSecMovingAverageReference = new AtomicReference <>(new MovingAverage (uploadBytesPerSecMovingAverageWindowSize ));
195206 uploadTimeMsMovingAverageReference = new AtomicReference <>(new MovingAverage (uploadTimeMsMovingAverageWindowSize ));
196-
197- latestLocalFileNameLengthMap = new HashMap <>();
198207 this .directoryFileTransferTracker = directoryFileTransferTracker ;
199208 }
200209
@@ -206,7 +215,8 @@ public long getLocalRefreshSeqNo() {
206215 return localRefreshSeqNo ;
207216 }
208217
209- public void updateLocalRefreshSeqNo (long localRefreshSeqNo ) {
218+ // Visible for testing
219+ void updateLocalRefreshSeqNo (long localRefreshSeqNo ) {
210220 assert localRefreshSeqNo >= this .localRefreshSeqNo : "newLocalRefreshSeqNo="
211221 + localRefreshSeqNo
212222 + " < "
@@ -224,7 +234,17 @@ public long getLocalRefreshClockTimeMs() {
224234 return localRefreshClockTimeMs ;
225235 }
226236
227- public void updateLocalRefreshTimeMs (long localRefreshTimeMs ) {
237+ /**
238+ * Updates the last refresh time and refresh seq no which is seen by local store.
239+ */
240+ public void updateLocalRefreshTimeAndSeqNo () {
241+ updateLocalRefreshClockTimeMs (System .currentTimeMillis ());
242+ updateLocalRefreshTimeMs (System .nanoTime () / 1_000_000L );
243+ updateLocalRefreshSeqNo (getLocalRefreshSeqNo () + 1 );
244+ }
245+
246+ // Visible for testing
247+ void updateLocalRefreshTimeMs (long localRefreshTimeMs ) {
228248 assert localRefreshTimeMs >= this .localRefreshTimeMs : "newLocalRefreshTimeMs="
229249 + localRefreshTimeMs
230250 + " < "
@@ -234,7 +254,7 @@ public void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
234254 computeTimeMsLag ();
235255 }
236256
237- public void updateLocalRefreshClockTimeMs (long localRefreshClockTimeMs ) {
257+ private void updateLocalRefreshClockTimeMs (long localRefreshClockTimeMs ) {
238258 this .localRefreshClockTimeMs = localRefreshClockTimeMs ;
239259 }
240260
@@ -369,12 +389,36 @@ long getRejectionCount(String rejectionReason) {
369389 return rejectionCountMap .get (rejectionReason ).get ();
370390 }
371391
372- Map <String , Long > getLatestLocalFileNameLengthMap () {
373- return latestLocalFileNameLengthMap ;
392+ public Map <String , Long > getLatestLocalFileNameLengthMap () {
393+ return Collections . unmodifiableMap ( latestLocalFileNameLengthMap ) ;
374394 }
375395
376- public void setLatestLocalFileNameLengthMap (Map <String , Long > latestLocalFileNameLengthMap ) {
377- this .latestLocalFileNameLengthMap = latestLocalFileNameLengthMap ;
396+ /**
397+ * Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
398+ *
399+ * @param segmentFiles list of local refreshed segment files
400+ * @param fileSizeFunction function is used to determine the file size in bytes
401+ */
402+ public void updateLatestLocalFileNameLengthMap (
403+ Collection <String > segmentFiles ,
404+ CheckedFunction <String , Long , IOException > fileSizeFunction
405+ ) {
406+ // Update the map
407+ segmentFiles .stream ()
408+ .filter (file -> EXCLUDE_FILES .contains (file ) == false )
409+ .filter (file -> latestLocalFileNameLengthMap .containsKey (file ) == false || latestLocalFileNameLengthMap .get (file ) == 0 )
410+ .forEach (file -> {
411+ long fileSize = 0 ;
412+ try {
413+ fileSize = fileSizeFunction .apply (file );
414+ } catch (IOException e ) {
415+ logger .warn (new ParameterizedMessage ("Exception while reading the fileLength of file={}" , file ), e );
416+ }
417+ latestLocalFileNameLengthMap .put (file , fileSize );
418+ });
419+ Set <String > fileSet = new HashSet <>(segmentFiles );
420+ // Remove keys from the fileSizeMap that do not exist in the latest segment files
421+ latestLocalFileNameLengthMap .entrySet ().removeIf (entry -> fileSet .contains (entry .getKey ()) == false );
378422 computeBytesLag ();
379423 }
380424
@@ -390,7 +434,7 @@ public void setLatestUploadedFiles(Set<String> files) {
390434 }
391435
392436 private void computeBytesLag () {
393- if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap .isEmpty ()) {
437+ if (latestLocalFileNameLengthMap .isEmpty ()) {
394438 return ;
395439 }
396440 Set <String > filesNotYetUploaded = latestLocalFileNameLengthMap .keySet ()
0 commit comments