8
8
9
9
package org .opensearch .common .remote ;
10
10
11
+ import org .apache .logging .log4j .LogManager ;
12
+ import org .apache .logging .log4j .Logger ;
11
13
import org .opensearch .common .blobstore .BlobPath ;
12
14
import org .opensearch .common .blobstore .stream .write .WritePriority ;
15
+ import org .opensearch .common .settings .ClusterSettings ;
16
+ import org .opensearch .common .settings .Setting ;
17
+ import org .opensearch .common .unit .TimeValue ;
13
18
import org .opensearch .core .action .ActionListener ;
14
19
import org .opensearch .index .translog .transfer .BlobStoreTransferService ;
15
20
import org .opensearch .repositories .blobstore .BlobStoreRepository ;
29
34
*/
30
35
public class RemoteWriteableEntityBlobStore <T , U extends RemoteWriteableBlobEntity <T >> implements RemoteWritableEntityStore <T , U > {
31
36
37
+ private static final Logger logger = LogManager .getLogger (RemoteWriteableEntityBlobStore .class );
32
38
private final BlobStoreTransferService transferService ;
33
39
private final BlobStoreRepository blobStoreRepository ;
34
40
private final String clusterName ;
35
41
private final ExecutorService executorService ;
36
42
private final String pathToken ;
43
+ /**
44
+ * To be used for identifying and logging read tasks/entities which take considerably more time in getting completed.
45
+ * Threshold corresponds to the total time spent in reading the blob along with deserializing the i/p stream.
46
+ */
47
+ public static final Setting <TimeValue > REMOTE_STORE_SLOW_READ_LOGGING_THRESHOLD_SETTING = Setting .positiveTimeSetting (
48
+ "cluster.remote_store.slow_read_logging_threshold" ,
49
+ TimeValue .timeValueSeconds (10 ),
50
+ Setting .Property .Dynamic ,
51
+ Setting .Property .NodeScope
52
+ );
53
+ private volatile TimeValue slowReadLoggingThreshold ;
37
54
38
55
public RemoteWriteableEntityBlobStore (
39
56
final BlobStoreTransferService blobStoreTransferService ,
40
57
final BlobStoreRepository blobStoreRepository ,
41
58
final String clusterName ,
42
59
final ThreadPool threadPool ,
43
60
final String executor ,
44
- final String pathToken
61
+ final String pathToken ,
62
+ final ClusterSettings clusterSettings
45
63
) {
46
64
this .transferService = blobStoreTransferService ;
47
65
this .blobStoreRepository = blobStoreRepository ;
48
66
this .clusterName = clusterName ;
49
67
this .executorService = threadPool .executor (executor );
50
68
this .pathToken = pathToken ;
69
+ this .slowReadLoggingThreshold = clusterSettings .get (REMOTE_STORE_SLOW_READ_LOGGING_THRESHOLD_SETTING );
70
+ clusterSettings .addSettingsUpdateConsumer (REMOTE_STORE_SLOW_READ_LOGGING_THRESHOLD_SETTING , this ::setSlowReadLoggingThreshold );
71
+ }
72
+
73
+ private void setSlowReadLoggingThreshold (TimeValue slowReadLoggingThreshold ) {
74
+ this .slowReadLoggingThreshold = slowReadLoggingThreshold ;
51
75
}
52
76
53
77
@ Override
@@ -71,10 +95,13 @@ public void writeAsync(final U entity, final ActionListener<Void> listener) {
71
95
72
96
@ Override
73
97
public T read (final U entity ) throws IOException {
74
- // TODO Add timing logs and tracing
75
98
assert entity .getFullBlobName () != null ;
99
+ final long readStartTimeNS = System .nanoTime ();
76
100
try (InputStream inputStream = transferService .downloadBlob (getBlobPathForDownload (entity ), entity .getBlobFileName ())) {
77
- return entity .deserialize (inputStream );
101
+ final long deserializeStartTimeNS = System .nanoTime ();
102
+ T deserializedBlobEntity = entity .deserialize (inputStream );
103
+ warnAboutSlowReadIfNeeded (entity , deserializeStartTimeNS , readStartTimeNS );
104
+ return deserializedBlobEntity ;
78
105
}
79
106
}
80
107
@@ -122,4 +149,18 @@ private static String encodeString(String content) {
122
149
return Base64 .getUrlEncoder ().withoutPadding ().encodeToString (content .getBytes (StandardCharsets .UTF_8 ));
123
150
}
124
151
152
+ private void warnAboutSlowReadIfNeeded (final U entity , final long deserializeStartTimeNS , final long readStartTimeNS ) {
153
+ long totalReadTimeMS = Math .max (0 , TimeValue .nsecToMSec (System .nanoTime () - readStartTimeNS ));
154
+ long serdeTimeMS = Math .max (0 , TimeValue .nsecToMSec (System .nanoTime () - deserializeStartTimeNS ));
155
+ if (totalReadTimeMS > slowReadLoggingThreshold .getMillis ()) {
156
+ logger .warn (
157
+ "entity [{}] for [{}] took [{}] for serde out of total read time [{}] which is above the total warn threshold of [{}]" ,
158
+ entity .getClass ().getSimpleName (),
159
+ entity .getBlobFileName (),
160
+ serdeTimeMS ,
161
+ totalReadTimeMS ,
162
+ slowReadLoggingThreshold
163
+ );
164
+ }
165
+ }
125
166
}
0 commit comments