101101import java .util .Optional ;
102102import java .util .UUID ;
103103import java .util .concurrent .ConcurrentHashMap ;
104- import java .util .concurrent .ConcurrentMap ;
105104import java .util .concurrent .CountDownLatch ;
106105import java .util .concurrent .ExecutorService ;
107106import java .util .concurrent .Executors ;
@@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
491490 indexShard .hashCode ()
492491 );
493492 // test the mapping
494- ConcurrentMap <ShardId , ConcurrentMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
493+ ConcurrentHashMap <ShardId , ConcurrentHashMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager
494+ .getCleanupKeyToCountMap ();
495495 // shard id should exist
496496 assertTrue (cleanupKeyToCountMap .containsKey (shardId ));
497497 // reader CacheKeyId should NOT exist
@@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
554554 );
555555
556556 // test the mapping
557- ConcurrentMap <ShardId , ConcurrentMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
557+ ConcurrentHashMap <ShardId , ConcurrentHashMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager
558+ .getCleanupKeyToCountMap ();
558559 // shard id should exist
559560 assertTrue (cleanupKeyToCountMap .containsKey (shardId ));
560561 // reader CacheKeyId should NOT exist
@@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
722723 cache .getOrCompute (getEntity (indexShard ), getLoader (reader ), reader , getTermBytes ());
723724 assertEquals (1 , cache .count ());
724725 // test the mappings
725- ConcurrentMap <ShardId , ConcurrentMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
726+ ConcurrentHashMap <ShardId , ConcurrentHashMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager
727+ .getCleanupKeyToCountMap ();
726728 assertEquals (1 , (int ) cleanupKeyToCountMap .get (shardId ).get (getReaderCacheKeyId (reader )));
727729
728730 cache .getOrCompute (getEntity (indexShard ), getLoader (secondReader ), secondReader , getTermBytes ());
@@ -796,15 +798,15 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
796798 }
797799
798800 // test adding to cleanupKeyToCountMap with multiple threads
799- public void testAddToCleanupKeyToCountMap () throws Exception {
801+ public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads () throws Exception {
800802 threadPool = getThreadPool ();
801803 Settings settings = Settings .builder ().put (INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING .getKey (), "51%" ).build ();
802804 cache = getIndicesRequestCache (settings );
803805
804806 int numberOfThreads = 10 ;
805807 int numberOfIterations = 1000 ;
806808 Phaser phaser = new Phaser (numberOfThreads + 1 ); // +1 for the main thread
807- AtomicBoolean exceptionDetected = new AtomicBoolean (false );
809+ AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean (false );
808810
809811 ExecutorService executorService = Executors .newFixedThreadPool (numberOfThreads );
810812
@@ -817,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
817819 }
818820 } catch (ConcurrentModificationException e ) {
819821 logger .error ("ConcurrentModificationException detected in thread : " + e .getMessage ());
820- exceptionDetected .set (true ); // Set flag if exception is detected
822+ concurrentModificationExceptionDetected .set (true ); // Set flag if exception is detected
821823 }
822824 });
823825 }
@@ -836,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
836838 }
837839 } catch (ConcurrentModificationException e ) {
838840 logger .error ("ConcurrentModificationException detected in main thread : " + e .getMessage ());
839- exceptionDetected .set (true ); // Set flag if exception is detected
841+ concurrentModificationExceptionDetected .set (true ); // Set flag if exception is detected
840842 }
841843 });
842844
843845 executorService .shutdown ();
844- executorService .awaitTermination (60 , TimeUnit .SECONDS );
845- assertFalse (exceptionDetected .get ());
846+ assertTrue (executorService .awaitTermination (60 , TimeUnit .SECONDS ));
847+ assertEquals (
848+ numberOfThreads * numberOfIterations ,
849+ cache .cacheCleanupManager .getCleanupKeyToCountMap ().get (indexShard .shardId ()).size ()
850+ );
851+ assertFalse (concurrentModificationExceptionDetected .get ());
846852 }
847853
848854 private IndicesRequestCache getIndicesRequestCache (Settings settings ) {
0 commit comments