-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[CAE-1088] feat: RESP3 notifications support #3418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 16 commits
b02eed6
1ff0ded
e6e2cea
d7fbe18
1331fb9
4747610
70231ae
958fb1a
79f6df2
c33b157
fdfcf94
be9b6dd
8006fab
d1d4529
a2de263
ad16b21
d3f6197
e6c5590
03bfd9f
9a7a5c8
ada72ce
91805bc
e31987f
075b930
f7948b5
3473c1e
d820ade
b6e712b
f66518c
f4ff2d6
cb8a4e5
c44c8b5
47dd490
1606de8
d530d45
5972b4c
ec4bf57
b4d0ff1
84123b1
d780401
604c8e3
b23f43c
7a0f316
225c0bf
2681d6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -9,6 +9,7 @@ import ( | |||||||||
"time" | ||||||||||
|
||||||||||
"github.com/redis/go-redis/v9/internal" | ||||||||||
"github.com/redis/go-redis/v9/internal/proto" | ||||||||||
) | ||||||||||
|
||||||||||
var ( | ||||||||||
|
@@ -71,6 +72,11 @@ type Options struct { | |||||||||
MaxActiveConns int | ||||||||||
ConnMaxIdleTime time.Duration | ||||||||||
ConnMaxLifetime time.Duration | ||||||||||
|
||||||||||
// Push notification processor for connections | ||||||||||
PushNotificationProcessor interface { | ||||||||||
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
type lastDialErrorWrap struct { | ||||||||||
|
@@ -228,6 +234,12 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { | |||||||||
|
||||||||||
cn := NewConn(netConn) | ||||||||||
cn.pooled = pooled | ||||||||||
|
||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. drop this line |
||||||||||
// Set push notification processor if available | ||||||||||
if p.cfg.PushNotificationProcessor != nil { | ||||||||||
cn.PushNotificationProcessor = p.cfg.PushNotificationProcessor | ||||||||||
} | ||||||||||
|
||||||||||
return cn, nil | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -377,9 +389,24 @@ func (p *ConnPool) popIdle() (*Conn, error) { | |||||||||
|
||||||||||
func (p *ConnPool) Put(ctx context.Context, cn *Conn) { | ||||||||||
if cn.rd.Buffered() > 0 { | ||||||||||
ndyakov marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
internal.Logger.Printf(ctx, "Conn has unread data") | ||||||||||
p.Remove(ctx, cn, BadConnError{}) | ||||||||||
return | ||||||||||
// Check if this might be push notification data | ||||||||||
if cn.PushNotificationProcessor != nil { | ||||||||||
// Try to process pending push notifications before discarding connection | ||||||||||
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd) | ||||||||||
if err != nil { | ||||||||||
internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err) | ||||||||||
} | ||||||||||
// Check again if there's still unread data after processing push notifications | ||||||||||
if cn.rd.Buffered() > 0 { | ||||||||||
internal.Logger.Printf(ctx, "Conn has unread data after processing push notifications") | ||||||||||
p.Remove(ctx, cn, BadConnError{}) | ||||||||||
return | ||||||||||
} | ||||||||||
Comment on lines
+393
to
+394
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This early
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually correct, instead of just returning, we should allow the connection to be put back in the pool |
||||||||||
} else { | ||||||||||
internal.Logger.Printf(ctx, "Conn has unread data") | ||||||||||
p.Remove(ctx, cn, BadConnError{}) | ||||||||||
return | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
if !cn.pooled { | ||||||||||
|
@@ -523,8 +550,24 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool { | |||||||||
return false | ||||||||||
} | ||||||||||
|
||||||||||
if connCheck(cn.netConn) != nil { | ||||||||||
return false | ||||||||||
// Check connection health, but be aware of push notifications | ||||||||||
if err := connCheck(cn.netConn); err != nil { | ||||||||||
// If there's unexpected data and we have push notification support, | ||||||||||
// it might be push notifications | ||||||||||
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil { | ||||||||||
// Try to process any pending push notifications | ||||||||||
ctx := context.Background() | ||||||||||
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil { | ||||||||||
internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr) | ||||||||||
return false | ||||||||||
} | ||||||||||
// Check again after processing push notifications | ||||||||||
if connCheck(cn.netConn) != nil { | ||||||||||
return false | ||||||||||
} | ||||||||||
} else { | ||||||||||
return false | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
cn.SetUsedAt(now) | ||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.