8
8
9
9
package org .opensearch .cache .common .tier ;
10
10
11
+ import org .apache .logging .log4j .LogManager ;
12
+ import org .apache .logging .log4j .Logger ;
11
13
import org .opensearch .cache .common .policy .TookTimePolicy ;
12
14
import org .opensearch .common .annotation .ExperimentalApi ;
13
15
import org .opensearch .common .cache .CacheType ;
35
37
import java .util .Map ;
36
38
import java .util .NoSuchElementException ;
37
39
import java .util .Objects ;
40
+ import java .util .concurrent .CompletableFuture ;
41
+ import java .util .concurrent .ConcurrentHashMap ;
42
+ import java .util .concurrent .ExecutionException ;
38
43
import java .util .concurrent .atomic .AtomicBoolean ;
39
44
import java .util .concurrent .locks .ReadWriteLock ;
40
45
import java .util .concurrent .locks .ReentrantReadWriteLock ;
46
+ import java .util .function .BiFunction ;
41
47
import java .util .function .Function ;
42
48
import java .util .function .Predicate ;
43
49
import java .util .function .ToLongBiFunction ;
@@ -61,6 +67,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
61
67
62
68
// Used to avoid caching stale entries in lower tiers.
63
69
private static final List <RemovalReason > SPILLOVER_REMOVAL_REASONS = List .of (RemovalReason .EVICTED , RemovalReason .CAPACITY );
70
+ private static final Logger logger = LogManager .getLogger (TieredSpilloverCache .class );
64
71
65
72
private final ICache <K , V > diskCache ;
66
73
private final ICache <K , V > onHeapCache ;
@@ -86,6 +93,12 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
86
93
private final Map <ICache <K , V >, TierInfo > caches ;
87
94
private final List <Predicate <V >> policies ;
88
95
96
+ /**
97
+ * This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
98
+ * only once.
99
+ */
100
+ Map <ICacheKey <K >, CompletableFuture <Tuple <ICacheKey <K >, V >>> completableFutureMap = new ConcurrentHashMap <>();
101
+
89
102
TieredSpilloverCache (Builder <K , V > builder ) {
90
103
Objects .requireNonNull (builder .onHeapCacheFactory , "onHeap cache builder can't be null" );
91
104
Objects .requireNonNull (builder .diskCacheFactory , "disk cache builder can't be null" );
@@ -190,10 +203,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
190
203
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
191
204
// This is needed as there can be many requests for the same key at the same time and we only want to load
192
205
// the value once.
193
- V value = null ;
194
- try (ReleasableLock ignore = writeLock .acquire ()) {
195
- value = onHeapCache .computeIfAbsent (key , loader );
196
- }
206
+ V value = compute (key , loader );
197
207
// Handle stats
198
208
if (loader .isLoaded ()) {
199
209
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
@@ -222,6 +232,57 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
222
232
return cacheValueTuple .v1 ();
223
233
}
224
234
235
+ private V compute (ICacheKey <K > key , LoadAwareCacheLoader <ICacheKey <K >, V > loader ) throws Exception {
236
+ // Only one of the threads will succeed putting a future into map for the same key.
237
+ // Rest will fetch existing future and wait on that to complete.
238
+ CompletableFuture <Tuple <ICacheKey <K >, V >> future = completableFutureMap .putIfAbsent (key , new CompletableFuture <>());
239
+ // Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
240
+ // the value. Also before returning value, puts the value in cache.
241
+ BiFunction <Tuple <ICacheKey <K >, V >, Throwable , Void > handler = (pair , ex ) -> {
242
+ if (pair != null ) {
243
+ try (ReleasableLock ignore = writeLock .acquire ()) {
244
+ onHeapCache .put (pair .v1 (), pair .v2 ());
245
+ } catch (Exception e ) {
246
+ // TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
247
+ // listeners/stats. Needs better exception handling at underlying layers.For now swallowing
248
+ // exception.
249
+ logger .warn ("Exception occurred while putting item onto heap cache" , e );
250
+ }
251
+ } else {
252
+ if (ex != null ) {
253
+ logger .warn ("Exception occurred while trying to compute the value" , ex );
254
+ }
255
+ }
256
+ completableFutureMap .remove (key ); // Remove key from map as not needed anymore.
257
+ return null ;
258
+ };
259
+ V value = null ;
260
+ if (future == null ) {
261
+ future = completableFutureMap .get (key );
262
+ future .handle (handler );
263
+ try {
264
+ value = loader .load (key );
265
+ } catch (Exception ex ) {
266
+ future .completeExceptionally (ex );
267
+ throw new ExecutionException (ex );
268
+ }
269
+ if (value == null ) {
270
+ NullPointerException npe = new NullPointerException ("Loader returned a null value" );
271
+ future .completeExceptionally (npe );
272
+ throw new ExecutionException (npe );
273
+ } else {
274
+ future .complete (new Tuple <>(key , value ));
275
+ }
276
+ } else {
277
+ try {
278
+ value = future .get ().v2 ();
279
+ } catch (InterruptedException ex ) {
280
+ throw new IllegalStateException (ex );
281
+ }
282
+ }
283
+ return value ;
284
+ }
285
+
225
286
@ Override
226
287
public void invalidate (ICacheKey <K > key ) {
227
288
// We are trying to invalidate the key from all caches though it would be present in only of them.
@@ -328,12 +389,22 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
328
389
ICacheKey <K > key = notification .getKey ();
329
390
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS .contains (notification .getRemovalReason ());
330
391
boolean countEvictionTowardsTotal = false ; // Don't count this eviction towards the cache's total if it ends up in the disk tier
331
- if (caches .get (diskCache ).isEnabled () && wasEvicted && evaluatePolicies (notification .getValue ())) {
392
+ boolean exceptionOccurredOnDiskCachePut = false ;
393
+ boolean canCacheOnDisk = caches .get (diskCache ).isEnabled () && wasEvicted && evaluatePolicies (notification .getValue ());
394
+ if (canCacheOnDisk ) {
332
395
try (ReleasableLock ignore = writeLock .acquire ()) {
333
396
diskCache .put (key , notification .getValue ()); // spill over to the disk tier and increment its stats
397
+ } catch (Exception ex ) {
398
+ // TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception
399
+ // in this case as it shouldn't cause upstream request to fail.
400
+ logger .warn ("Exception occurred while putting item to disk cache" , ex );
401
+ exceptionOccurredOnDiskCachePut = true ;
334
402
}
335
- updateStatsOnPut (TIER_DIMENSION_VALUE_DISK , key , notification .getValue ());
336
- } else {
403
+ if (!exceptionOccurredOnDiskCachePut ) {
404
+ updateStatsOnPut (TIER_DIMENSION_VALUE_DISK , key , notification .getValue ());
405
+ }
406
+ }
407
+ if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut ) {
337
408
// If the value is not going to the disk cache, send this notification to the TSC's removal listener
338
409
// as the value is leaving the TSC entirely
339
410
removalListener .onRemoval (notification );
0 commit comments