@@ -215,7 +215,7 @@ func RunTestInvocationTimeout(g *gomega.WithT, namedMap coherence.NamedMap[int,
215215func TestBasicOperationsAgainstMapAndCache (t * testing.T ) {
216216 g := gomega .NewWithT (t )
217217
218- session , err := utils .GetSession ()
218+ session , err := utils .GetSession (coherence . WithAddress ( "coherence:///localhost:7574" ) )
219219 g .Expect (err ).ShouldNot (gomega .HaveOccurred ())
220220 defer session .Close ()
221221
@@ -254,6 +254,7 @@ func TestBasicOperationsAgainstMapAndCache(t *testing.T) {
254254 {"NamedCacheRunTestEntrySetFilter" , utils .GetNamedCache [int , utils.Person ](g , session , "entryset-filter-cache" ), RunTestEntrySetFilter },
255255 {"NamedMapRunTestEntrySetFilterWithComparator" , utils .GetNamedMap [int , utils.Person ](g , session , "entryset-filter-map-comparator" ), RunTestEntrySetFilterWithComparator },
256256 {"NamedCacheRunTestEntrySetFilterWithComparator" , utils .GetNamedCache [int , utils.Person ](g , session , "entryset-filter-cache-comparator" ), RunTestEntrySetFilterWithComparator },
257+ {"NamedCacheRunTestEntrySetFilterWithComparatorStream" , utils .GetNamedCache [int , utils.Person ](g , session , "entryset-filter-cache-comparator-stream" ), RunTestEntrySetFilterWithComparatorStream },
257258 {"NamedMapRunTestKeySetFilter" , utils .GetNamedMap [int , utils.Person ](g , session , "keyset-map" ), RunTestKeySetFilter },
258259 {"NamedCacheRunTestKeySetFilter" , utils .GetNamedCache [int , utils.Person ](g , session , "keyset-cache" ), RunTestKeySetFilter },
259260 {"NamedMapRunTestGetAll" , utils .GetNamedMap [int , utils.Person ](g , session , "getall-filter-map" ), RunTestGetAll },
@@ -901,6 +902,92 @@ func RunTestEntrySetFilterWithComparator(t *testing.T, namedMap coherence.NamedM
901902 g .Expect (len (results )).To (gomega .Equal (len (peopleData )))
902903}
903904
905+ func RunTestEntrySetFilterWithComparatorStream (t * testing.T , namedMap coherence.NamedMap [int , utils.Person ]) {
906+ var (
907+ g = gomega .NewWithT (t )
908+ comparatorAscending = extractors .ExtractorComparator (extractors.Extract [int ]("age" ), true )
909+ count = 0
910+ maxPeople = 10_000
911+ )
912+
913+ if namedMap .GetSession ().GetProtocolVersion () == 0 {
914+ // skip as not supported in V0
915+ return
916+ }
917+
918+ utils .ClearNamedMap (g , namedMap )
919+
920+ // populate the cache
921+ for i := 1 ; i <= maxPeople ; i ++ {
922+ _ , err := namedMap .Put (ctx , i , utils.Person {
923+ ID : i ,
924+ Name : fmt .Sprintf ("name-%d" , i ),
925+ Age : 10 + (i % 100 ),
926+ })
927+ g .Expect (err ).NotTo (gomega .HaveOccurred ())
928+
929+ }
930+ utils .AssertSize (g , namedMap , maxPeople )
931+
932+ // only retrieve 300 entries and then skip out
933+ ch := coherence .EntrySetFilterWithComparator (ctx , namedMap , filters .Always (), comparatorAscending )
934+ for se := range ch {
935+ g .Expect (se .Err ).ShouldNot (gomega .HaveOccurred ())
936+ g .Expect (se .Value ).ShouldNot (gomega .BeNil ())
937+ count ++
938+ if count == 100 {
939+ // exit before all entries read
940+ break
941+ }
942+ }
943+
944+ // because we exited potentially before all entries were read, drain the channel
945+ drainWithIdleTimeout (ch , 2 * time .Second )
946+
947+ // run a go routine to access the same cache again
948+ var wg sync.WaitGroup
949+ wg .Add (1 )
950+
951+ go func () {
952+ defer wg .Done ()
953+ count = 0
954+ // tun the same query again
955+ ch2 := coherence .EntrySetFilterWithComparator (ctx , namedMap , filters .Always (), comparatorAscending )
956+ for se := range ch2 {
957+ g .Expect (se .Err ).ShouldNot (gomega .HaveOccurred ())
958+ count ++
959+ }
960+
961+ g .Expect (count ).To (gomega .Equal (maxPeople ))
962+ }()
963+
964+ wg .Wait ()
965+ }
966+
967+ func drainWithIdleTimeout [T any ](ch <- chan T , timeout time.Duration ) {
968+ go func () {
969+ timer := time .NewTimer (timeout )
970+ defer timer .Stop ()
971+
972+ for {
973+ select {
974+ case val , ok := <- ch :
975+ if ! ok {
976+ return
977+ }
978+ if ! timer .Stop () {
979+ <- timer .C
980+ }
981+ timer .Reset (timeout )
982+ _ = val // discard
983+
984+ case <- timer .C :
985+ return
986+ }
987+ }
988+ }()
989+ }
990+
904991func RunTestKeySetFilter (t * testing.T , namedMap coherence.NamedMap [int , utils.Person ]) {
905992 var (
906993 g = gomega .NewWithT (t )
0 commit comments