diff --git a/Makefile b/Makefile index 96c2166..ba1403b 100644 --- a/Makefile +++ b/Makefile @@ -244,7 +244,7 @@ generate-proto-v1: $(TOOLS_BIN)/protoc ## Generate Proto Files v1 # ---------------------------------------------------------------------------------------------------------------------- .PHONY: show-docs show-docs: ## Show the Documentation - @echo "Serving documentation on http://localhost:6060/pkg/github.com/oracle/coherence-go-client/" + @echo "Serving documentation on http://localhost:6060/pkg/github.com/oracle/coherence-go-client/v2" go install golang.org/x/tools/cmd/godoc@latest godoc -goroot $(GOROOT) -http=:6060 @@ -421,7 +421,7 @@ getcopyright: ## Download copyright jar locally if necessary. $(TOOLS_BIN)/protoc: @mkdir -p $(TOOLS_BIN) ./scripts/download-protoc.sh $(TOOLS_DIRECTORY) - go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.30.0 + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.33.0 go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0 diff --git a/coherence/common.go b/coherence/common.go index 7cd84d5..45c0f38 100644 --- a/coherence/common.go +++ b/coherence/common.go @@ -44,6 +44,9 @@ const ( // envResolverDebug enables resolver debug messages to be displayed. envResolverDebug = "COHERENCE_RESOLVER_DEBUG" + // envResolverDebug sets the number of retries when the resolver fails. + envResolverRetries = "COHERENCE_RESOLVER_RETRIES" + // envResolverDebug enables randomization of addresses returned by resolver envResolverRandomize = "COHERENCE_RESOLVER_RANDOMIZE" diff --git a/coherence/event.go b/coherence/event.go index 36260c7..39e9f72 100644 --- a/coherence/event.go +++ b/coherence/event.go @@ -60,7 +60,7 @@ const ( // MapEventType describes an event raised by a cache mutation. type MapEventType string -// MapLifecycleEventType describes an event that may be raised during the lifecycle +// MapLifecycleEventType describes an event type that may be raised during the lifecycle // of a cache. type MapLifecycleEventType string @@ -159,8 +159,13 @@ func (se *sessionLifecycleEvent) String() string { return fmt.Sprintf("SessionLifecycleEvent{source=%v, format=%s}", se.Source(), se.Type()) } +// MapLifecycleEvent describes an event that may be raised during the lifecycle +// of a cache. type MapLifecycleEvent[K comparable, V any] interface { + // Source returns the source of this MapLifecycleEvent. Source() NamedMap[K, V] + + // Type returns the MapLifecycleEventType for this MapLifecycleEvent. Type() MapLifecycleEventType } @@ -186,22 +191,39 @@ func (l *mapLifecycleEvent[K, V]) Source() NamedMap[K, V] { return l.source } -// String returns a string representation of a MapLifecycleEvent. +// String returns a string representation of a [MapLifecycleEvent]. func (l *mapLifecycleEvent[K, V]) String() string { return fmt.Sprintf("MapLifecycleEvent{source=%v, type=%s}", l.Source().GetCacheName(), l.Type()) } -// MapEvent an event which indicates that the content of the NamedMap or -// NamedCache has changed (i.e., an entry has been added, updated, and/or +// MapEvent an event which indicates that the content of the [NamedMap] or +// [NamedCache] has changed (i.e., an entry has been added, updated, and/or // removed). type MapEvent[K comparable, V any] interface { + // Source returns the source of this MapEvent. Source() NamedMap[K, V] + + // Key returns the key of the entry for which this event was raised. Key() (*K, error) + + // OldValue returns the old value, if any, of the entry for which this event + // was raised. OldValue() (*V, error) + + // NewValue returns the new value, if any, of the entry for which this event + // was raised. NewValue() (*V, error) + + // Type returns the MapEventType for this MapEvent. Type() MapEventType + + // IsExpired returns true if the event was generated from an expiry event. Only valid for gRPC v1 connections. IsExpired() (bool, error) + + // IsPriming returns true if the event is a priming event. Only valid for gRPC v1 connections. IsPriming() (bool, error) + + // IsSynthetic returns true if the event is a synthetic event. Only valid for gRPC v1 connections. IsSynthetic() (bool, error) } @@ -1197,7 +1219,7 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam bc.filterListenersV1 = make(map[filters.Filter]*listenerGroupV1[K, V], 0) bc.filterIDToGroupV1 = make(map[int64]*listenerGroupV1[K, V], 0) - // re-ensure all the caches as the connected has gone and so has the gRPC Proxy + // re-ensure all the caches as the connection has gone and so has the gRPC Proxy for _, c := range cacheNames { cacheID, err3 := bc.session.v1StreamManagerCache.ensureCache(context.Background(), c) if err3 != nil { diff --git a/coherence/localcache.go b/coherence/localcache.go index 3d8f228..ceae6ce 100644 --- a/coherence/localcache.go +++ b/coherence/localcache.go @@ -43,7 +43,7 @@ type localCache[K comparable, V any] interface { GetStats() CacheStats } -// CacheStats defines various statics for near caches. +// CacheStats contains various statistics for near caches. type CacheStats interface { GetCacheHits() int64 // the number of entries served from the near cache GetCacheMisses() int64 // the number of entries that had to be retrieved from the cluster @@ -479,7 +479,8 @@ func (l *localCacheImpl[K, V]) String() string { // updateEntrySize updates the cacheMemory size based upon a local entry. The sign indicates to either remove or add. func (l *localCacheImpl[K, V]) updateEntrySize(entry *localCacheEntry[K, V], sign int) { l.updateCacheMemory(int64(sign)*(int64(unsafe.Sizeof(entry.key))+int64(unsafe.Sizeof(entry.value))+ - (int64(unsafe.Sizeof(entry.ttl)))+(int64(unsafe.Sizeof(entry.insertTime)))) + (int64(unsafe.Sizeof(entry.lastAccess)))) + (int64(unsafe.Sizeof(entry.ttl)))+(int64(unsafe.Sizeof(entry.insertTime)))) + + (int64(unsafe.Sizeof(entry.lastAccess))) + int64(unsafe.Sizeof(entry))) } func formatMemory(bytesValue int64) string { diff --git a/coherence/queue_events.go b/coherence/queue_events.go index 493e2a5..1b0284b 100644 --- a/coherence/queue_events.go +++ b/coherence/queue_events.go @@ -57,9 +57,16 @@ func (l *queueLifecycleEvent[V]) String() string { // QueueLifecycleListener allows registering callbacks to be notified when lifecycle events // (truncated, released or destroyed) occur against a [NamedQueue]. type QueueLifecycleListener[V any] interface { + // OnAny registers a callback that will be notified when any [NamedQueue] event occurs. OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] + + // OnDestroyed registers a callback that will be notified when a [NamedQueue] is destroyed. OnDestroyed(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] + + // OnTruncated registers a callback that will be notified when a [Queue] is truncated. OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] + + // OnReleased registers a callback that will be notified when a [NamedQueue] is released. OnReleased(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] getEmitter() *eventEmitter[QueueLifecycleEventType, QueueLifecycleEvent[V]] } @@ -89,7 +96,7 @@ func (q *queueLifecycleListener[V]) OnReleased(callback func(QueueLifecycleEvent return q.on(QueueReleased, callback) } -// OnTruncated registers a callback that will be notified when a [NamedMap] is truncated. +// OnTruncated registers a callback that will be notified when a [Queue] is truncated. func (q *queueLifecycleListener[V]) OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] { return q.on(QueueTruncated, callback) } @@ -98,7 +105,7 @@ func (q *queueLifecycleListener[V]) getEmitter() *eventEmitter[QueueLifecycleEve return q.emitter } -// OnAny registers a callback that will be notified when any [NamedMap] event occurs. +// OnAny registers a callback that will be notified when any [NamedQueue] event occurs. func (q *queueLifecycleListener[V]) OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] { return q.OnTruncated(callback).OnDestroyed(callback).OnReleased(callback) } diff --git a/coherence/resolver.go b/coherence/resolver.go index 53874d2..67b4b9b 100644 --- a/coherence/resolver.go +++ b/coherence/resolver.go @@ -12,13 +12,16 @@ import ( "github.com/oracle/coherence-go-client/v2/coherence/discovery" "google.golang.org/grpc/resolver" "math/rand" + "strconv" "strings" "sync" "time" ) const ( - nsLookupScheme = "coherence" + nsLookupScheme = "coherence" + defaultRetries = 20 + defaultResolverDelay = 1000 // ms ) var ( @@ -42,16 +45,27 @@ func (b *nsLookupResolverBuilder) Build(target resolver.Target, cc resolver.Clie } checkResolverDebug() + // set the number of resolver retried + retries := getStringValueFromEnvVarOrDefault(envResolverRetries, "20") + retriesValue, err := strconv.Atoi(retries) + if err != nil { + retriesValue = defaultRetries + } + + resolverDebug("resolver retries=%v", retriesValue) + r.resolverRetries = retriesValue + r.start() return r, nil } func (*nsLookupResolverBuilder) Scheme() string { return nsLookupScheme } type nsLookupResolver struct { - target resolver.Target - cc resolver.ClientConn - mutex sync.Mutex - addrStore map[string][]string + target resolver.Target + cc resolver.ClientConn + mutex sync.Mutex + addrStore map[string][]string + resolverRetries int } func (r *nsLookupResolver) resolve() { @@ -60,10 +74,10 @@ func (r *nsLookupResolver) resolve() { defer r.mutex.Unlock() if len(grpcEndpoints) == 0 { - // try 8 times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over - for i := 0; i < 8; i++ { - resolverDebug("retrying NSLookup attempt", i) - time.Sleep(time.Duration(250) * time.Millisecond) + // try r.resolverRetries; times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over + for i := 1; i <= r.resolverRetries; i++ { + resolverDebug("retrying NSLookup attempt: %v", i) + time.Sleep(time.Duration(defaultResolverDelay) * time.Millisecond) grpcEndpoints = generateNSAddresses(r.target.Endpoint()) if len(grpcEndpoints) != 0 { break diff --git a/coherence/serializers.go b/coherence/serializers.go index 42e573b..dc6320d 100644 --- a/coherence/serializers.go +++ b/coherence/serializers.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. */ @@ -28,8 +28,13 @@ type mathValue[T any] struct { // Serializer defines how to serialize/ de-serialize objects. type Serializer[T any] interface { + // Serialize serializes an object of type T and returns the []byte representation. Serialize(object T) ([]byte, error) + + // Deserialize deserialized an object and returns the correct type of T. Deserialize(data []byte) (*T, error) + + // Format returns the format used for the serializer. Format() string } @@ -111,6 +116,7 @@ func (s JSONSerializer[T]) Deserialize(data []byte) (*T, error) { return &zeroValue, fmt.Errorf("invalid serialization prefix %v", data[0]) } +// Format returns the format used for the serializer. func (s JSONSerializer[T]) Format() string { return s.format } diff --git a/coherence/session.go b/coherence/session.go index 733caec..427b998 100644 --- a/coherence/session.go +++ b/coherence/session.go @@ -19,6 +19,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" "log" "os" "reflect" @@ -510,7 +511,14 @@ func (s *Session) ensureConnection() error { s.v1StreamManagerCache = manager apiMessage = fmt.Sprintf(" %v", manager) } else { - s.debug("error connecting to session via v1, falling back to v0: %v", err1) + // check if this is a gRPC status error + if sts, ok := status.FromError(err1); ok { + if sts.Message() == "Method not found: coherence.proxy.v1.ProxyService/subChannel" { + s.debug("error connecting to session via v1, falling back to v0: %v", err1) + } else { + s.debug("received a different gRPC error: %v", err1) + } + } } logMessage(INFO, "Session [%s] connected to [%s]%s", s.sessionID, s.sessOpts.Address, apiMessage) @@ -547,14 +555,14 @@ func (s *Session) ensureConnection() error { return } - if newState == connectivity.Ready || newState == connectivity.Idle { + if newState == connectivity.Ready { if !firstConnect && !connected { - // Reconnect + // Reconnected disconnectTime = 0 session.closed = false connected = true - logMessage(INFO, "Session [%s] re-connected to address %s", session.sessionID, session.sessOpts.Address) + logMessage(INFO, "Session [%s] re-connected to address %s (%v)", session.sessionID, session.sessOpts.Address, newState) session.dispatch(Reconnected, func() SessionLifecycleEvent { return newSessionLifecycleEvent(session, Reconnected) }) diff --git a/coherence/session_test.go b/coherence/session_test.go index 75cda99..04b620d 100644 --- a/coherence/session_test.go +++ b/coherence/session_test.go @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022, 2023 Oracle and/or its affiliates. +* Copyright (c) 2022, 2024 Oracle and/or its affiliates. * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. */ @@ -9,6 +9,7 @@ package coherence import ( "context" "github.com/onsi/gomega" + "os" "strconv" "testing" "time" @@ -21,6 +22,8 @@ func TestSessionValidation(t *testing.T) { ctx = context.Background() ) + os.Setenv("COHERENCE_SESSION_DEBUG", "true") + _, err = NewSession(ctx, WithFormat("not-json")) g.Expect(err).To(gomega.Equal(ErrInvalidFormat)) diff --git a/coherence/v1client.go b/coherence/v1client.go index ab996ff..de65bad 100644 --- a/coherence/v1client.go +++ b/coherence/v1client.go @@ -843,6 +843,7 @@ func (m *streamManagerV1) putGenericRequest(ctx context.Context, reqType pb1.Nam return unwrapBytes(result) } +// BinaryKeyAndValue is an internal type exported only for serialization. type BinaryKeyAndValue struct { Key []byte Value []byte @@ -850,12 +851,14 @@ type BinaryKeyAndValue struct { Cookie []byte } +// BinaryKey is an internal type exported only for serialization. type BinaryKey struct { Key []byte Err error Cookie []byte } +// BinaryValue is an internal type exported only for serialization. type BinaryValue struct { Value []byte Err error diff --git a/java/coherence-go-queues/pom.xml b/java/coherence-go-queues/pom.xml index a49f56c..d0061a7 100644 --- a/java/coherence-go-queues/pom.xml +++ b/java/coherence-go-queues/pom.xml @@ -50,10 +50,7 @@ javax - - - . - + false diff --git a/scripts/run-checkin-test.sh b/scripts/run-checkin-test.sh new file mode 100755 index 0000000..0ab8cf0 --- /dev/null +++ b/scripts/run-checkin-test.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +# +# Copyright (c) 2022, 2024 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at +# https://oss.oracle.com/licenses/upl. +# + +# This script runs some tests that should succeed to be sure we can push +set -e + +echo "Coherence CE 24.09 All Tests gRPC v1" +COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone + +echo "Coherence CE 24.09 with queues" +COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax,queues COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone-queues + +echo "Coherence CE 22.06.10" +COHERENCE_VERSION=22.06.10 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-standalone \ No newline at end of file diff --git a/test/e2e/standalone/near_cache_test.go b/test/e2e/standalone/near_cache_test.go index c6d07ce..e40c824 100644 --- a/test/e2e/standalone/near_cache_test.go +++ b/test/e2e/standalone/near_cache_test.go @@ -13,7 +13,9 @@ import ( "github.com/oracle/coherence-go-client/v2/coherence/filters" "github.com/oracle/coherence-go-client/v2/coherence/processors" "github.com/oracle/coherence-go-client/v2/test/utils" + "log" "math" + "strconv" "testing" "time" ) @@ -21,6 +23,7 @@ import ( const ( nearCacheName = "near-cache" nearMapName = "near-map" + noNearCache = "no-near-cache" ) // TestNearCacheOperationsAgainstMapAndCache runs all near cache tests against NamedMap and NamedCache. @@ -536,12 +539,12 @@ func TestDuplicateNamedCache(t *testing.T) { namedMap.Release() // test creating a NamedCache WITHOUT near cache and then trying to get a NamedCache WITH near cache - namedCache, err = coherence.GetNamedCache[int, string](session, "no-near-cache") + namedCache, err = coherence.GetNamedCache[int, string](session, noNearCache) g.Expect(namedCache).To(gomega.Not(gomega.BeNil())) g.Expect(err).ShouldNot(gomega.HaveOccurred()) // try to get the same cache name with near cache config, should fail - _, err = coherence.GetNamedCache[int, string](session, "no-near-cache", coherence.WithNearCache(&nearCacheOptions10Seconds)) + _, err = coherence.GetNamedCache[int, string](session, noNearCache, coherence.WithNearCache(&nearCacheOptions10Seconds)) fmt.Println(err) g.Expect(err).Should(gomega.HaveOccurred()) @@ -583,6 +586,66 @@ func TestNearCachePruneFactor(t *testing.T) { g.Expect(coherence.GetNearCachePruneFactor[int, string](namedCache)).To(gomega.Equal(float32(0.8))) } +// TestNearCacheComparison runs tests to compare near and normal cache and outputs size and memory usage. +func TestNearCacheComparison(t *testing.T) { + g := gomega.NewWithT(t) + session, err := utils.GetSession() + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer session.Close() + + const maxValues = 2_000 + + nearCacheOptions := &coherence.NearCacheOptions{HighUnits: maxValues * 2} + namedCache, err := coherence.GetNamedCache[string, string](session, noNearCache) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + + namedCacheNear, err := coherence.GetNamedCache[string, string](session, nearCacheName, coherence.WithNearCache(nearCacheOptions)) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + + g.Expect(namedCache.Clear(ctx)).ShouldNot(gomega.HaveOccurred()) + g.Expect(namedCacheNear.Clear(ctx)).ShouldNot(gomega.HaveOccurred()) + + values := make(map[string]string, 0) + + // populate the map + for i := 1; i <= maxValues; i++ { + kv := strconv.Itoa(i) + values[kv] = kv + } + + log.Printf("Insert %v entries into caches", maxValues) + + g.Expect(namedCache.PutAll(ctx, values)).ShouldNot(gomega.HaveOccurred()) + g.Expect(namedCacheNear.PutAll(ctx, values)).ShouldNot(gomega.HaveOccurred()) + + log.Println("Start", maxValues, "gets on normal cache") + start := time.Now() + for i := 1; i <= maxValues; i++ { + kv := strconv.Itoa(i) + _, err = namedCache.Get(ctx, kv) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + + log.Printf("Time to get %v from normal cache is %v", maxValues, time.Since(start)) + + for j := 1; j <= 2; j++ { + log.Printf("Run %v of get %v gets on near cache", j, maxValues) + start = time.Now() + for i := 1; i <= maxValues; i++ { + kv := strconv.Itoa(i) + _, err = namedCacheNear.Get(ctx, kv) + g.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + + log.Printf("Run: %v time to get %v from near cache is %v", j, maxValues, time.Since(start)) + } + + log.Println(namedCacheNear.GetNearCacheStats()) + + g.Expect(namedCacheNear.Destroy(ctx)).ShouldNot(gomega.HaveOccurred()) + g.Expect(namedCache.Destroy(ctx)).ShouldNot(gomega.HaveOccurred()) +} + // TestInvalidNearCacheOptions runs tests to ensure that we can't create a named cache/map with invalid options. func TestInvalidNearCacheOptions(t *testing.T) { var (