Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ generate-proto-v1: $(TOOLS_BIN)/protoc ## Generate Proto Files v1
# ----------------------------------------------------------------------------------------------------------------------
.PHONY: show-docs
show-docs: ## Show the Documentation
@echo "Serving documentation on http://localhost:6060/pkg/github.com/oracle/coherence-go-client/"
@echo "Serving documentation on http://localhost:6060/pkg/github.com/oracle/coherence-go-client/v2"
go install golang.org/x/tools/cmd/godoc@latest
godoc -goroot $(GOROOT) -http=:6060

Expand Down Expand Up @@ -421,7 +421,7 @@ getcopyright: ## Download copyright jar locally if necessary.
$(TOOLS_BIN)/protoc:
@mkdir -p $(TOOLS_BIN)
./scripts/download-protoc.sh $(TOOLS_DIRECTORY)
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.30.0
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.33.0
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0


Expand Down
3 changes: 3 additions & 0 deletions coherence/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const (
// envResolverDebug enables resolver debug messages to be displayed.
envResolverDebug = "COHERENCE_RESOLVER_DEBUG"

// envResolverDebug sets the number of retries when the resolver fails.
envResolverRetries = "COHERENCE_RESOLVER_RETRIES"

// envResolverDebug enables randomization of addresses returned by resolver
envResolverRandomize = "COHERENCE_RESOLVER_RANDOMIZE"

Expand Down
32 changes: 27 additions & 5 deletions coherence/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
// MapEventType describes an event raised by a cache mutation.
type MapEventType string

// MapLifecycleEventType describes an event that may be raised during the lifecycle
// MapLifecycleEventType describes an event type that may be raised during the lifecycle
// of a cache.
type MapLifecycleEventType string

Expand Down Expand Up @@ -159,8 +159,13 @@ func (se *sessionLifecycleEvent) String() string {
return fmt.Sprintf("SessionLifecycleEvent{source=%v, format=%s}", se.Source(), se.Type())
}

// MapLifecycleEvent describes an event that may be raised during the lifecycle
// of a cache.
type MapLifecycleEvent[K comparable, V any] interface {
// Source returns the source of this MapLifecycleEvent.
Source() NamedMap[K, V]

// Type returns the MapLifecycleEventType for this MapLifecycleEvent.
Type() MapLifecycleEventType
}

Expand All @@ -186,22 +191,39 @@ func (l *mapLifecycleEvent[K, V]) Source() NamedMap[K, V] {
return l.source
}

// String returns a string representation of a MapLifecycleEvent.
// String returns a string representation of a [MapLifecycleEvent].
func (l *mapLifecycleEvent[K, V]) String() string {
return fmt.Sprintf("MapLifecycleEvent{source=%v, type=%s}", l.Source().GetCacheName(), l.Type())
}

// MapEvent an event which indicates that the content of the NamedMap or
// NamedCache has changed (i.e., an entry has been added, updated, and/or
// MapEvent an event which indicates that the content of the [NamedMap] or
// [NamedCache] has changed (i.e., an entry has been added, updated, and/or
// removed).
type MapEvent[K comparable, V any] interface {
// Source returns the source of this MapEvent.
Source() NamedMap[K, V]

// Key returns the key of the entry for which this event was raised.
Key() (*K, error)

// OldValue returns the old value, if any, of the entry for which this event
// was raised.
OldValue() (*V, error)

// NewValue returns the new value, if any, of the entry for which this event
// was raised.
NewValue() (*V, error)

// Type returns the MapEventType for this MapEvent.
Type() MapEventType

// IsExpired returns true if the event was generated from an expiry event. Only valid for gRPC v1 connections.
IsExpired() (bool, error)

// IsPriming returns true if the event is a priming event. Only valid for gRPC v1 connections.
IsPriming() (bool, error)

// IsSynthetic returns true if the event is a synthetic event. Only valid for gRPC v1 connections.
IsSynthetic() (bool, error)
}

Expand Down Expand Up @@ -1197,7 +1219,7 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam
bc.filterListenersV1 = make(map[filters.Filter]*listenerGroupV1[K, V], 0)
bc.filterIDToGroupV1 = make(map[int64]*listenerGroupV1[K, V], 0)

// re-ensure all the caches as the connected has gone and so has the gRPC Proxy
// re-ensure all the caches as the connection has gone and so has the gRPC Proxy
for _, c := range cacheNames {
cacheID, err3 := bc.session.v1StreamManagerCache.ensureCache(context.Background(), c)
if err3 != nil {
Expand Down
5 changes: 3 additions & 2 deletions coherence/localcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type localCache[K comparable, V any] interface {
GetStats() CacheStats
}

// CacheStats defines various statics for near caches.
// CacheStats contains various statistics for near caches.
type CacheStats interface {
GetCacheHits() int64 // the number of entries served from the near cache
GetCacheMisses() int64 // the number of entries that had to be retrieved from the cluster
Expand Down Expand Up @@ -479,7 +479,8 @@ func (l *localCacheImpl[K, V]) String() string {
// updateEntrySize updates the cacheMemory size based upon a local entry. The sign indicates to either remove or add.
func (l *localCacheImpl[K, V]) updateEntrySize(entry *localCacheEntry[K, V], sign int) {
l.updateCacheMemory(int64(sign)*(int64(unsafe.Sizeof(entry.key))+int64(unsafe.Sizeof(entry.value))+
(int64(unsafe.Sizeof(entry.ttl)))+(int64(unsafe.Sizeof(entry.insertTime)))) + (int64(unsafe.Sizeof(entry.lastAccess))))
(int64(unsafe.Sizeof(entry.ttl)))+(int64(unsafe.Sizeof(entry.insertTime)))) +
(int64(unsafe.Sizeof(entry.lastAccess))) + int64(unsafe.Sizeof(entry)))
}

func formatMemory(bytesValue int64) string {
Expand Down
11 changes: 9 additions & 2 deletions coherence/queue_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,16 @@ func (l *queueLifecycleEvent[V]) String() string {
// QueueLifecycleListener allows registering callbacks to be notified when lifecycle events
// (truncated, released or destroyed) occur against a [NamedQueue].
type QueueLifecycleListener[V any] interface {
// OnAny registers a callback that will be notified when any [NamedQueue] event occurs.
OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

// OnDestroyed registers a callback that will be notified when a [NamedQueue] is destroyed.
OnDestroyed(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

// OnTruncated registers a callback that will be notified when a [Queue] is truncated.
OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

// OnReleased registers a callback that will be notified when a [NamedQueue] is released.
OnReleased(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]
getEmitter() *eventEmitter[QueueLifecycleEventType, QueueLifecycleEvent[V]]
}
Expand Down Expand Up @@ -89,7 +96,7 @@ func (q *queueLifecycleListener[V]) OnReleased(callback func(QueueLifecycleEvent
return q.on(QueueReleased, callback)
}

// OnTruncated registers a callback that will be notified when a [NamedMap] is truncated.
// OnTruncated registers a callback that will be notified when a [Queue] is truncated.
func (q *queueLifecycleListener[V]) OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] {
return q.on(QueueTruncated, callback)
}
Expand All @@ -98,7 +105,7 @@ func (q *queueLifecycleListener[V]) getEmitter() *eventEmitter[QueueLifecycleEve
return q.emitter
}

// OnAny registers a callback that will be notified when any [NamedMap] event occurs.
// OnAny registers a callback that will be notified when any [NamedQueue] event occurs.
func (q *queueLifecycleListener[V]) OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] {
return q.OnTruncated(callback).OnDestroyed(callback).OnReleased(callback)
}
Expand Down
32 changes: 23 additions & 9 deletions coherence/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
"github.com/oracle/coherence-go-client/v2/coherence/discovery"
"google.golang.org/grpc/resolver"
"math/rand"
"strconv"
"strings"
"sync"
"time"
)

const (
nsLookupScheme = "coherence"
nsLookupScheme = "coherence"
defaultRetries = 20
defaultResolverDelay = 1000 // ms
)

var (
Expand All @@ -42,16 +45,27 @@ func (b *nsLookupResolverBuilder) Build(target resolver.Target, cc resolver.Clie
}
checkResolverDebug()

// set the number of resolver retried
retries := getStringValueFromEnvVarOrDefault(envResolverRetries, "20")
retriesValue, err := strconv.Atoi(retries)
if err != nil {
retriesValue = defaultRetries
}

resolverDebug("resolver retries=%v", retriesValue)
r.resolverRetries = retriesValue

r.start()
return r, nil
}
func (*nsLookupResolverBuilder) Scheme() string { return nsLookupScheme }

type nsLookupResolver struct {
target resolver.Target
cc resolver.ClientConn
mutex sync.Mutex
addrStore map[string][]string
target resolver.Target
cc resolver.ClientConn
mutex sync.Mutex
addrStore map[string][]string
resolverRetries int
}

func (r *nsLookupResolver) resolve() {
Expand All @@ -60,10 +74,10 @@ func (r *nsLookupResolver) resolve() {
defer r.mutex.Unlock()

if len(grpcEndpoints) == 0 {
// try 8 times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over
for i := 0; i < 8; i++ {
resolverDebug("retrying NSLookup attempt", i)
time.Sleep(time.Duration(250) * time.Millisecond)
// try r.resolverRetries; times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over
for i := 1; i <= r.resolverRetries; i++ {
resolverDebug("retrying NSLookup attempt: %v", i)
time.Sleep(time.Duration(defaultResolverDelay) * time.Millisecond)
grpcEndpoints = generateNSAddresses(r.target.Endpoint())
if len(grpcEndpoints) != 0 {
break
Expand Down
8 changes: 7 additions & 1 deletion coherence/serializers.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
Expand Down Expand Up @@ -28,8 +28,13 @@ type mathValue[T any] struct {

// Serializer defines how to serialize/ de-serialize objects.
type Serializer[T any] interface {
// Serialize serializes an object of type T and returns the []byte representation.
Serialize(object T) ([]byte, error)

// Deserialize deserialized an object and returns the correct type of T.
Deserialize(data []byte) (*T, error)

// Format returns the format used for the serializer.
Format() string
}

Expand Down Expand Up @@ -111,6 +116,7 @@ func (s JSONSerializer[T]) Deserialize(data []byte) (*T, error) {
return &zeroValue, fmt.Errorf("invalid serialization prefix %v", data[0])
}

// Format returns the format used for the serializer.
func (s JSONSerializer[T]) Format() string {
return s.format
}
16 changes: 12 additions & 4 deletions coherence/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"log"
"os"
"reflect"
Expand Down Expand Up @@ -510,7 +511,14 @@ func (s *Session) ensureConnection() error {
s.v1StreamManagerCache = manager
apiMessage = fmt.Sprintf(" %v", manager)
} else {
s.debug("error connecting to session via v1, falling back to v0: %v", err1)
// check if this is a gRPC status error
if sts, ok := status.FromError(err1); ok {
if sts.Message() == "Method not found: coherence.proxy.v1.ProxyService/subChannel" {
s.debug("error connecting to session via v1, falling back to v0: %v", err1)
} else {
s.debug("received a different gRPC error: %v", err1)
}
}
}

logMessage(INFO, "Session [%s] connected to [%s]%s", s.sessionID, s.sessOpts.Address, apiMessage)
Expand Down Expand Up @@ -547,14 +555,14 @@ func (s *Session) ensureConnection() error {
return
}

if newState == connectivity.Ready || newState == connectivity.Idle {
if newState == connectivity.Ready {
if !firstConnect && !connected {
// Reconnect
// Reconnected
disconnectTime = 0
session.closed = false
connected = true

logMessage(INFO, "Session [%s] re-connected to address %s", session.sessionID, session.sessOpts.Address)
logMessage(INFO, "Session [%s] re-connected to address %s (%v)", session.sessionID, session.sessOpts.Address, newState)
session.dispatch(Reconnected, func() SessionLifecycleEvent {
return newSessionLifecycleEvent(session, Reconnected)
})
Expand Down
5 changes: 4 additions & 1 deletion coherence/session_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
Expand All @@ -9,6 +9,7 @@ package coherence
import (
"context"
"github.com/onsi/gomega"
"os"
"strconv"
"testing"
"time"
Expand All @@ -21,6 +22,8 @@ func TestSessionValidation(t *testing.T) {
ctx = context.Background()
)

os.Setenv("COHERENCE_SESSION_DEBUG", "true")

_, err = NewSession(ctx, WithFormat("not-json"))
g.Expect(err).To(gomega.Equal(ErrInvalidFormat))

Expand Down
3 changes: 3 additions & 0 deletions coherence/v1client.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,19 +843,22 @@ func (m *streamManagerV1) putGenericRequest(ctx context.Context, reqType pb1.Nam
return unwrapBytes(result)
}

// BinaryKeyAndValue is an internal type exported only for serialization.
type BinaryKeyAndValue struct {
Key []byte
Value []byte
Err error
Cookie []byte
}

// BinaryKey is an internal type exported only for serialization.
type BinaryKey struct {
Key []byte
Err error
Cookie []byte
}

// BinaryValue is an internal type exported only for serialization.
type BinaryValue struct {
Value []byte
Err error
Expand Down
5 changes: 1 addition & 4 deletions java/coherence-go-queues/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@
<profile>
<id>javax</id>
<activation>
<!-- This is a work-around for the fact that activeByDefault does not do what you'd think it should -->
<file>
<exists>.</exists>
</file>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
Expand Down
19 changes: 19 additions & 0 deletions scripts/run-checkin-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

#
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at
# https://oss.oracle.com/licenses/upl.
#

# This script runs some tests that should succeed to be sure we can push
set -e

echo "Coherence CE 24.09 All Tests gRPC v1"
COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone

echo "Coherence CE 24.09 with queues"
COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax,queues COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone-queues

echo "Coherence CE 22.06.10"
COHERENCE_VERSION=22.06.10 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-standalone
Loading
Loading