Skip to content

[CAE-1088] feat: RESP3 notifications support #3418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b02eed6
feat: add general push notification system
ndyakov Jun 26, 2025
1ff0ded
feat: enforce single handler per notification type
ndyakov Jun 26, 2025
e6e2cea
feat: remove global handlers and enable push notifications by default
ndyakov Jun 26, 2025
d7fbe18
feat: fix connection health check interference with push notifications
ndyakov Jun 26, 2025
1331fb9
fix: remove unused fields and ensure push notifications work in clone…
ndyakov Jun 26, 2025
4747610
test: add comprehensive unit tests for 100% coverage
ndyakov Jun 26, 2025
70231ae
refactor: simplify push notification interface
ndyakov Jun 26, 2025
958fb1a
fix: resolve data race in PushNotificationProcessor
ndyakov Jun 26, 2025
79f6df2
remove: push-notification-demo
ndyakov Jun 26, 2025
c33b157
feat: add protected handler support and rename command to pushNotific…
ndyakov Jun 26, 2025
fdfcf94
feat: add VoidPushNotificationProcessor for disabled push notifications
ndyakov Jun 26, 2025
be9b6dd
refactor: remove unnecessary enabled field and IsEnabled/SetEnabled m…
ndyakov Jun 26, 2025
8006fab
fix: ensure push notification processor is never nil in newConn
ndyakov Jun 26, 2025
d1d4529
fix: initialize push notification processor in SentinelClient
ndyakov Jun 26, 2025
a2de263
fix: copy push notification processor to transaction baseClient
ndyakov Jun 26, 2025
ad16b21
fix: initialize push notification processor in NewFailoverClient
ndyakov Jun 27, 2025
d3f6197
feat: add GetHandler method and improve push notification API encapsu…
ndyakov Jun 27, 2025
e6c5590
feat: enable real push notification processors for SentinelClient and…
ndyakov Jun 27, 2025
03bfd9f
feat: remove GetRegistry from PushNotificationProcessorInterface for …
ndyakov Jun 27, 2025
9a7a5c8
fix: add nil reader check in ProcessPendingNotifications to prevent p…
ndyakov Jun 27, 2025
ada72ce
refactor: move push notification logic to pusnotif package
ndyakov Jun 27, 2025
91805bc
refactor: remove handlerWrapper and use separate maps in registry
ndyakov Jun 27, 2025
e31987f
Fixes tests:
ndyakov Jun 27, 2025
075b930
fix: update coverage test to expect errors for disabled push notifica…
ndyakov Jun 27, 2025
f7948b5
fix: address pr review
ndyakov Jun 27, 2025
3473c1e
fix: simplify api
ndyakov Jun 27, 2025
d820ade
test: add comprehensive test coverage for pushnotif package
ndyakov Jun 27, 2025
b6e712b
feat: add proactive push notification processing to WithReader
ndyakov Jun 27, 2025
f66518c
feat: add pub/sub message filtering to push notification processor
ndyakov Jun 27, 2025
f4ff2d6
feat: expand notification filtering to include streams, keyspace, and…
ndyakov Jun 27, 2025
cb8a4e5
feat: process push notifications before returning connections from pool
ndyakov Jul 2, 2025
c44c8b5
fix: increase peek notification name bytes
ndyakov Jul 3, 2025
47dd490
feat: enhance push notification handlers with context information
ndyakov Jul 4, 2025
1606de8
feat: implement strongly typed HandlerContext interface
ndyakov Jul 4, 2025
d530d45
feat: implement strongly typed HandlerContext with concrete types in …
ndyakov Jul 4, 2025
5972b4c
refactor: move all push notification logic to root package and remove…
ndyakov Jul 4, 2025
ec4bf57
cleanup: remove redundant internal push notification packages
ndyakov Jul 4, 2025
b4d0ff1
refactor: organize push notification code into separate files
ndyakov Jul 4, 2025
84123b1
refactor(push): completly change the package structure
ndyakov Jul 4, 2025
d780401
refactor(push): simplify handler context
ndyakov Jul 5, 2025
604c8e3
fix(tests): debug logger
ndyakov Jul 5, 2025
b23f43c
fix(peek): non-blocking peek
ndyakov Jul 5, 2025
7a0f316
fix(tests): remove bench_decode tests
ndyakov Jul 5, 2025
225c0bf
fix(tests): add global ctx in tests
ndyakov Jul 5, 2025
2681d6d
Merge branch 'master' into ndyakov/CAE-1088-resp3-notification-handlers
ndyakov Jul 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type Conn struct {
createdAt time.Time

onClose func() error

// Push notification processor for handling push notifications on this connection
PushNotificationProcessor interface {
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
}
}

func NewConn(netConn net.Conn) *Conn {
Expand Down
53 changes: 48 additions & 5 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto"
)

var (
Expand Down Expand Up @@ -71,6 +72,11 @@ type Options struct {
MaxActiveConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration

// Push notification processor for connections
PushNotificationProcessor interface {
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
}
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -228,6 +234,12 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {

cn := NewConn(netConn)
cn.pooled = pooled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop this line

// Set push notification processor if available
if p.cfg.PushNotificationProcessor != nil {
cn.PushNotificationProcessor = p.cfg.PushNotificationProcessor
}

return cn, nil
}

Expand Down Expand Up @@ -377,9 +389,24 @@ func (p *ConnPool) popIdle() (*Conn, error) {

func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
// Check if this might be push notification data
if cn.PushNotificationProcessor != nil {
// Try to process pending push notifications before discarding connection
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd)
if err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err)
}
// Check again if there's still unread data after processing push notifications
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data after processing push notifications")
p.Remove(ctx, cn, BadConnError{})
return
}
Comment on lines +393 to +394
Copy link
Preview

Copilot AI Jul 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This early return in Put prevents the connection from being placed back into the pool, leading to connection leaks. You should still call p.connsMu/p.Put or similar logic after detecting push data, or explicitly return the connection to the pool.

Suggested change
return
}
}
// Allow the connection to proceed to the pool management logic

Copilot uses AI. Check for mistakes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually correct, instead of just returning, we should allow the connection to be put back in the pool

} else {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
}
}

if !cn.pooled {
Expand Down Expand Up @@ -523,8 +550,24 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
return false
}

if connCheck(cn.netConn) != nil {
return false
// Check connection health, but be aware of push notifications
if err := connCheck(cn.netConn); err != nil {
// If there's unexpected data and we have push notification support,
// it might be push notifications
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil {
// Try to process any pending push notifications
ctx := context.Background()
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr)
return false
}
// Check again after processing push notifications
if connCheck(cn.netConn) != nil {
return false
}
} else {
return false
}
}

cn.SetUsedAt(now)
Expand Down
17 changes: 17 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,21 @@ type Options struct {
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
UnstableResp3 bool

// PushNotifications enables general push notification processing.
// When enabled, the client will process RESP3 push notifications and
// route them to registered handlers.
//
// For RESP3 connections (Protocol: 3), push notifications are automatically enabled.
// To disable push notifications for RESP3, use Protocol: 2 instead.
// For RESP2 connections, push notifications are not available.
//
// default: automatically enabled for RESP3, disabled for RESP2
PushNotifications bool

// PushNotificationProcessor is the processor for handling push notifications.
// If nil, a default processor will be created when PushNotifications is enabled.
PushNotificationProcessor PushNotificationProcessorInterface
}

func (opt *Options) init() {
Expand Down Expand Up @@ -592,5 +607,7 @@ func newConnPool(
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
// Pass push notification processor for connection initialization
PushNotificationProcessor: opt.PushNotificationProcessor,
})
}
39 changes: 38 additions & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,21 @@ type PubSub struct {
chOnce sync.Once
msgCh *channel
allCh *channel

// Push notification processor for handling generic push notifications
pushProcessor PushNotificationProcessorInterface
}

func (c *PubSub) init() {
c.exit = make(chan struct{})
}

// SetPushNotificationProcessor sets the push notification processor for handling
// generic push notifications received on this PubSub connection.
func (c *PubSub) SetPushNotificationProcessor(processor PushNotificationProcessorInterface) {
c.pushProcessor = processor
}

func (c *PubSub) String() string {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -367,6 +376,18 @@ func (p *Pong) String() string {
return "Pong"
}

// PushNotificationMessage represents a generic push notification received on a PubSub connection.
type PushNotificationMessage struct {
// Command is the push notification command (e.g., "MOVING", "CUSTOM_EVENT").
Command string
// Args are the arguments following the command.
Args []interface{}
}

func (m *PushNotificationMessage) String() string {
return fmt.Sprintf("push: %s", m.Command)
}

func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
switch reply := reply.(type) {
case string:
Expand Down Expand Up @@ -413,6 +434,19 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
Payload: reply[1].(string),
}, nil
default:
// Try to handle as generic push notification
ctx := c.getContext()
registry := c.pushProcessor.GetRegistry()
if registry != nil {
handled := registry.HandleNotification(ctx, reply)
if handled {
// Return a special message type to indicate it was handled
return &PushNotificationMessage{
Command: kind,
Args: reply[1:],
}, nil
}
}
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
}
default:
Expand Down Expand Up @@ -658,6 +692,9 @@ func (c *channel) initMsgChan() {
// Ignore.
case *Pong:
// Ignore.
case *PushNotificationMessage:
// Ignore push notifications in message-only channel
// They are already handled by the push notification processor
case *Message:
timer.Reset(c.chanSendTimeout)
select {
Expand Down Expand Up @@ -712,7 +749,7 @@ func (c *channel) initAllChan() {
switch msg := msg.(type) {
case *Pong:
// Ignore.
case *Subscription, *Message:
case *Subscription, *Message, *PushNotificationMessage:
timer.Reset(c.chanSendTimeout)
select {
case c.allCh <- msg:
Expand Down
Loading
Loading