-
-
Notifications
You must be signed in to change notification settings - Fork 28
Description
There is a problem with the current implementation of this lib, is that if server is implemented to create routine that calls ServeHTTP, then calls Publish, the result could be that message is ignored.
ServeHTTP calls Subscribe here:
Lines 168 to 175 in e3ddbdf
| if err = s.provider.Subscribe(r.Context(), sub); err != nil { | |
| if l != nil { | |
| l.Error("sse: subscribe error", "error", err) | |
| } | |
| http.Error(w, err.Error(), http.StatusInternalServerError) | |
| return | |
| } |
which by default would write subscription to channel:
Line 131 in e3ddbdf
| case j.subscription <- subscription{done: done, Subscription: sub}: |
which ideally should be processed and stored here:
Lines 262 to 277 in e3ddbdf
| case sub := <-j.subscription: | |
| var err error | |
| if replay != nil { | |
| err = tryReplay(sub.Subscription, &replay) | |
| } | |
| // NOTE(tmaxmax): We can't meaningfully handle replay panics in any way | |
| // other than disabling replay altogether. This ensures uptime | |
| // in the face of unexpected – returning the panic as an error | |
| // to the subscriber doesn't make sense, as it's probably not the subscriber's fault. | |
| if _, isPanic := err.(replayPanic); err != nil && !isPanic { //nolint:errorlint // it's our error | |
| sub.done <- err | |
| close(sub.done) | |
| } else { | |
| j.subscribers[sub.done] = sub.Subscription | |
| } |
, before Publish caused list of subscribers to be checked:
Lines 242 to 243 in e3ddbdf
| for done, sub := range j.subscribers { | |
| if topicsIntersect(sub.Topics, msg.topics) { |
In case if last two bits happened in the wrong order, the result would be data loss, i.e message that was published and is expected to be delivered, could be ignored by this library.
I believe there should be some sort of a way for server to signal whenever subscriber is stored, i.e some kind of channel should be written to after this:
Lines 275 to 277 in e3ddbdf
| } else { | |
| j.subscribers[sub.done] = sub.Subscription | |
| } |
Ideally I'd expect some sort of Ready channel that caller can pass to Server and server should be writing in there after subscription was stored on the provider.